From bec8d69f9e208b015171824320f03d2f38a883b2 Mon Sep 17 00:00:00 2001 From: Niko Date: Thu, 4 May 2023 15:55:38 +0300 Subject: [PATCH] some cleanup and license headers --- Cargo.lock | 87 +--- Cargo.toml | 2 +- README.md | 2 +- ng-app-js/Cargo.toml | 6 +- ng-app-js/src/client_connection.rs | 3 - ng-app-js/src/lib.rs | 38 +- ngcli/Cargo.toml | 1 - ngcli/src/main.rs | 69 ++-- p2p-broker/src/auth.rs | 180 --------- p2p-broker/src/connection_local.rs | 147 ------- p2p-broker/src/lib.rs | 7 - p2p-broker/src/server.rs | 2 - p2p-broker/src/server_connection.rs | 3 - p2p-broker/src/server_ws.rs | 164 ++------ p2p-client-ws/Cargo.toml | 1 - p2p-client-ws/src/connection_ws.rs | 93 ----- p2p-client-ws/src/lib.rs | 2 - p2p-client-ws/src/remote_ws.rs | 86 +--- p2p-client-ws/src/remote_ws_wasm.rs | 31 -- p2p-client/Cargo.toml | 25 -- p2p-client/src/connection_remote.rs | 598 ---------------------------- p2p-client/src/lib.rs | 47 --- p2p-net/src/actor.rs | 53 +-- p2p-net/src/actors/noise.rs | 17 +- p2p-net/src/actors/start.rs | 23 +- p2p-net/src/broker.rs | 17 +- p2p-net/src/connection.rs | 13 +- p2p-net/src/lib.rs | 11 + p2p-net/src/utils.rs | 11 + 29 files changed, 182 insertions(+), 1557 deletions(-) delete mode 100644 ng-app-js/src/client_connection.rs delete mode 100644 p2p-broker/src/auth.rs delete mode 100644 p2p-broker/src/connection_local.rs delete mode 100644 p2p-broker/src/server_connection.rs delete mode 100644 p2p-client-ws/src/connection_ws.rs delete mode 100644 p2p-client/Cargo.toml delete mode 100644 p2p-client/src/connection_remote.rs delete mode 100644 p2p-client/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 327cbaf..64a94c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -554,20 +554,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "curve25519-dalek" -version = "4.0.0-rc.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d4ba9852b42210c7538b75484f9daa0655e9a3ac04f693747bb0f02cf3cfe16" -dependencies = [ - "cfg-if", - "fiat-crypto", - "packed_simd_2", - "platforms", - "subtle", - "zeroize", -] - [[package]] name = "debug_print" version = "1.0.0" @@ -621,7 +607,7 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c762bae6dcaf24c4c84667b8579785430908723d5c889f469d76a41d59cc7a9d" dependencies = [ - "curve25519-dalek 3.2.1", + "curve25519-dalek", "ed25519", "rand 0.7.3", "serde", @@ -689,12 +675,6 @@ dependencies = [ "instant", ] -[[package]] -name = "fiat-crypto" -version = "0.1.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93ace6ec7cc19c8ed33a32eaa9ea692d7faea05006b5356b9e2b668ec4bc3955" - [[package]] name = "fnv" version = "1.0.7" @@ -1009,12 +989,6 @@ version = "0.2.140" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "99227334921fae1a979cf0bfdfcc6b3e5ce376ef57e16fb6fb3ea2ed6095f80c" -[[package]] -name = "libm" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fc7aa29613bd6a620df431842069224d8bc9011086b1db4c0e0cd47fa03ec9a" - [[package]] name = "linux-raw-sys" version = "0.1.4" @@ -1077,7 +1051,6 @@ dependencies = [ "futures", "getrandom 0.1.16", "gloo-timers", - "js-sys", "p2p-client-ws", "p2p-net", "p2p-repo", @@ -1085,11 +1058,9 @@ dependencies = [ "serde", "serde_bare", "serde_bytes", - "snow", "wasm-bindgen", "wasm-bindgen-futures", "wasm-bindgen-test", - "web-sys", "ws_stream_wasm", ] @@ -1104,7 +1075,6 @@ dependencies = [ "fastbloom-rs", "futures", "p2p-broker", - "p2p-client", "p2p-client-ws", "p2p-net", "p2p-repo", @@ -1224,26 +1194,6 @@ dependencies = [ "tempfile", ] -[[package]] -name = "p2p-client" -version = "0.1.0" -dependencies = [ - "async-channel", - "async-oneshot", - "async-std", - "async-trait", - "chacha20 0.9.0", - "debug_print", - "futures", - "p2p-net", - "p2p-repo", - "serde", - "serde_bare", - "serde_bytes", - "snow", - "xactor", -] - [[package]] name = "p2p-client-ws" version = "0.1.0" @@ -1257,7 +1207,6 @@ dependencies = [ "debug_print", "futures", "getrandom 0.2.8", - "p2p-client", "p2p-net", "p2p-repo", "pharos", @@ -1336,16 +1285,6 @@ dependencies = [ "serde_bytes", ] -[[package]] -name = "packed_simd_2" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1914cd452d8fccd6f9db48147b29fd4ae05bea9dc5d9ad578509f72415de282" -dependencies = [ - "cfg-if", - "libm", -] - [[package]] name = "parking" version = "2.0.0" @@ -1415,12 +1354,6 @@ version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160" -[[package]] -name = "platforms" -version = "3.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3d7ddaed09e0eb771a79ab0fd64609ba0afb0a8366421957936ad14cbd13630" - [[package]] name = "polling" version = "2.5.2" @@ -1807,22 +1740,6 @@ version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" -[[package]] -name = "snow" -version = "0.9.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ccba027ba85743e09d15c03296797cad56395089b832b48b5a5217880f57733" -dependencies = [ - "aes-gcm", - "blake2", - "chacha20poly1305", - "curve25519-dalek 4.0.0-rc.1", - "rand_core 0.6.4", - "rustc_version", - "sha2 0.10.6", - "subtle", -] - [[package]] name = "socket2" version = "0.4.9" @@ -2327,7 +2244,7 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2392b6b94a576b4e2bf3c5b2757d63f10ada8020a2e4d08ac849ebcf6ea8e077" dependencies = [ - "curve25519-dalek 3.2.1", + "curve25519-dalek", "rand_core 0.5.1", "zeroize", ] diff --git a/Cargo.toml b/Cargo.toml index b9b968b..ec671cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = [ "p2p-repo", "p2p-net", "p2p-broker", - "p2p-client", + "p2p-client-ws", "p2p-verifier", "p2p-stores-lmdb", "ngcli", diff --git a/README.md b/README.md index 3fee3cf..6c28c15 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ The crates are organized as follow : - p2p-repo : all the common types, traits and structs for the P2P repositories - p2p-net : all the common types, traits and structs for the P2P networks - p2p-broker : the broker code (as server and core peer) -- p2p-client : the client connecting to a broker, used by the apps and verifier +- p2p-client-ws : the client connecting to a broker, used by the apps and verifier - p2p-stores-lmdb : lmdb backed stores for the p2p layer - p2p-verifier : the code of the verifier - ngcli : CLI tool to manipulate the repos diff --git a/ng-app-js/Cargo.toml b/ng-app-js/Cargo.toml index ec425bf..81f9993 100644 --- a/ng-app-js/Cargo.toml +++ b/ng-app-js/Cargo.toml @@ -26,7 +26,7 @@ debug_print = "1.0.0" serde = { version = "1.0", features = ["derive"] } serde_bare = "0.5.0" serde_bytes = "0.11.7" -snow = "0.9.2" +# snow = "0.9.2" getrandom = { version = "0.1.1", features = ["wasm-bindgen"] } @@ -35,9 +35,9 @@ getrandom = { version = "0.1.1", features = ["wasm-bindgen"] } # features = ["js"] [target.'cfg(target_arch = "wasm32")'.dependencies] -js-sys = "0.3.61" +# js-sys = "0.3.61" wasm-bindgen-futures = "0.4.34" -web-sys = { version = "0.3.61", features = ["Window"] } +# web-sys = { version = "0.3.61", features = ["Window"] } gloo-timers = "0.2.6" [dev-dependencies] diff --git a/ng-app-js/src/client_connection.rs b/ng-app-js/src/client_connection.rs deleted file mode 100644 index 08af2b3..0000000 --- a/ng-app-js/src/client_connection.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub struct ClientConnection {} - -impl ClientConnection {} diff --git a/ng-app-js/src/lib.rs b/ng-app-js/src/lib.rs index d6fce95..a60f628 100644 --- a/ng-app-js/src/lib.rs +++ b/ng-app-js/src/lib.rs @@ -1,6 +1,17 @@ +/* + * Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers + * All rights reserved. + * Licensed under the Apache License, Version 2.0 + * + * or the MIT license , + * at your option. All files in the project carrying such + * notice may not be copied, modified, or distributed except + * according to those terms. +*/ + use async_std::task; -#[cfg(target_arch = "wasm32")] -use js_sys::Reflect; +// #[cfg(target_arch = "wasm32")] +// use js_sys::Reflect; #[cfg(target_arch = "wasm32")] use p2p_client_ws::remote_ws_wasm::ConnectionWebSocket; use p2p_net::broker::*; @@ -22,13 +33,11 @@ extern "C" { #[cfg(target_arch = "wasm32")] #[wasm_bindgen] -pub async fn greet(name: &str) { - log!("I say: {}", name); - let mut random_buf = [0u8; 32]; - getrandom::getrandom(&mut random_buf).unwrap(); +pub async fn start() { + // let mut random_buf = [0u8; 32]; + // getrandom::getrandom(&mut random_buf).unwrap(); - //spawn_and_log_error(testt("ws://127.0.0.1:3012")); - async fn method() -> ResultSend<()> { + async fn inner_task() -> ResultSend<()> { let server_key = PubKey::Ed25519PubKey([ 22, 140, 190, 111, 82, 151, 27, 133, 83, 121, 71, 36, 209, 53, 53, 114, 52, 254, 218, 241, 52, 155, 231, 83, 188, 189, 47, 135, 105, 213, 39, 91, @@ -88,14 +97,13 @@ pub async fn greet(name: &str) { Ok(()) } - spawn_and_log_error(method()).await; - //spawn_and_log_error(Arc::clone(&cnx).open("ws://127.0.0.1:3012", priv_key, pub_key)); + spawn_and_log_error(inner_task()).await; } #[cfg(not(target_arch = "wasm32"))] #[wasm_bindgen] -pub fn greet(name: &str) { - alert(&format!("I say: {}", name)); +pub fn start() { + //alert(&format!("I say: {}", name)); task::spawn(async move {}); } @@ -111,10 +119,10 @@ pub fn change(name: &str) -> JsValue { mod test { use wasm_bindgen_test::*; wasm_bindgen_test_configure!(run_in_browser); - use crate::greet; + use crate::start; #[wasm_bindgen_test] - pub async fn test_greet() { - greet("test").await; + pub async fn test_connection() { + start().await; } } diff --git a/ngcli/Cargo.toml b/ngcli/Cargo.toml index a6864e7..2c4d6ae 100644 --- a/ngcli/Cargo.toml +++ b/ngcli/Cargo.toml @@ -12,7 +12,6 @@ debug_print = "1.0.0" p2p-repo = { path = "../p2p-repo" } p2p-net = { path = "../p2p-net" } p2p-client-ws = { path = "../p2p-client-ws" } -p2p-client = { path = "../p2p-client" } p2p-broker = { path = "../p2p-broker" } p2p-stores-lmdb = { path = "../p2p-stores-lmdb" } async-std = { version = "1.12.0", features = ["attributes"] } diff --git a/ngcli/src/main.rs b/ngcli/src/main.rs index fea7d27..31d43c8 100644 --- a/ngcli/src/main.rs +++ b/ngcli/src/main.rs @@ -3,7 +3,7 @@ // This code is partly derived from work written by TG x Thoth from P2Pcollab. // Copyright 2022 TG x Thoth // Licensed under the Apache License, Version 2.0 -// +// // or the MIT license , // at your option. All files in the project carrying such // notice may not be copied, modified, or distributed except @@ -13,23 +13,22 @@ use debug_print::*; use ed25519_dalek::*; use fastbloom_rs::{BloomFilter as Filter, FilterBuilder, Membership}; use futures::{future, pin_mut, stream, SinkExt, StreamExt}; +use p2p_broker::broker_store::config::ConfigMode; use p2p_repo::object::Object; use p2p_repo::store::{store_max_value_size, store_valid_value_size, HashMapRepoStore, RepoStore}; -use p2p_broker::broker_store::config::ConfigMode; use p2p_stores_lmdb::broker_store::LmdbBrokerStore; use p2p_stores_lmdb::repo_store::LmdbRepoStore; use rand::rngs::OsRng; use std::collections::HashMap; -use p2p_repo::types::*; -use p2p_repo::utils::{generate_keypair, now_timestamp}; -use p2p_broker::server_ws::*; use p2p_broker::server::*; +use p2p_broker::server_ws::*; +use p2p_net::broker_connection::*; use p2p_net::errors::*; use p2p_net::types::*; -use p2p_net::broker_connection::*; -use p2p_client::connection_remote::*; -use p2p_client_ws::connection_ws::*; +use p2p_repo::types::*; +use p2p_repo::utils::{generate_keypair, now_timestamp}; + use p2p_broker::connection_local::*; fn block_size() -> usize { @@ -450,15 +449,22 @@ async fn test_sync(cnx: &mut impl BrokerConnection, user_pub_key: PubKey, userpr // now the client can verify the DAG and each commit. Then update its list of heads. } -async fn test(cnx: &mut impl BrokerConnection, pub_key: PubKey, priv_key: PrivKey) -> Result<(), ProtocolError>{ - - cnx.add_user(PubKey::Ed25519PubKey([1; 32]), priv_key).await?; +async fn test( + cnx: &mut impl BrokerConnection, + pub_key: PubKey, + priv_key: PrivKey, +) -> Result<(), ProtocolError> { + cnx.add_user(PubKey::Ed25519PubKey([1; 32]), priv_key) + .await?; cnx.add_user(pub_key, priv_key).await?; //.expect("add_user 2 (myself) failed"); assert_eq!( - cnx.add_user(PubKey::Ed25519PubKey([1; 32]), priv_key).await.err().unwrap(), + cnx.add_user(PubKey::Ed25519PubKey([1; 32]), priv_key) + .await + .err() + .unwrap(), ProtocolError::UserAlreadyExists ); @@ -467,9 +473,7 @@ async fn test(cnx: &mut impl BrokerConnection, pub_key: PubKey, priv_key: PrivKe secret: SymKey::ChaCha20Key([0; 32]), peers: vec![], }); - let mut public_overlay_cnx = cnx - .overlay_connect(&repo, true) - .await?; + let mut public_overlay_cnx = cnx.overlay_connect(&repo, true).await?; debug_println!("put_block"); @@ -505,25 +509,21 @@ async fn test(cnx: &mut impl BrokerConnection, pub_key: PubKey, priv_key: PrivKe let mut my_block_stream = public_overlay_cnx .get_block(my_block_id, true, None) .await?; - //.expect("get_block failed"); + //.expect("get_block failed"); while let Some(b) = my_block_stream.next().await { debug_println!("GOT BLOCK {}", b.id()); } - let mut my_object_stream = public_overlay_cnx - .get_block(object_id, true, None) - .await?; - //.expect("get_block for object failed"); + let mut my_object_stream = public_overlay_cnx.get_block(object_id, true, None).await?; + //.expect("get_block for object failed"); while let Some(b) = my_object_stream.next().await { debug_println!("GOT BLOCK {}", b.id()); } - let object = public_overlay_cnx - .get_object(object_id, None) - .await?; - //.expect("get_object failed"); + let object = public_overlay_cnx.get_object(object_id, None).await?; + //.expect("get_object failed"); debug_println!("GOT OBJECT with ID {}", object.id()); @@ -534,19 +534,17 @@ async fn test(cnx: &mut impl BrokerConnection, pub_key: PubKey, priv_key: PrivKe // debug_println!("COPIED OBJECT to OBJECT ID {}", object_id); - public_overlay_cnx - .delete_object(object_id) - .await?; - //.expect("delete_object failed"); + public_overlay_cnx.delete_object(object_id).await?; + //.expect("delete_object failed"); let res = public_overlay_cnx .get_object(object_id, None) .await .unwrap_err(); - + debug_println!("result from get object after delete: {}", res); assert_eq!(res, ProtocolError::NotFound); - + //TODO test pin/unpin // TEST BRANCH SYNC @@ -586,16 +584,12 @@ async fn test_remote_connection(url: &str) { Ok(mut cnx) => { if let Err(e) = test(&mut cnx, pub_key, priv_key).await { debug_println!("error: {:?}", e) - } - else { + } else { cnx.close().await; - - } } - Err(e) => { - + } } + Err(e) => {} } - } #[xactor::main] @@ -624,7 +618,6 @@ mod test { #[async_std::test] pub async fn test_remote_cnx() -> Result<(), Box> { - let thr = task::spawn(run_server_accept_one("127.0.0.1:3012")); std::thread::sleep(std::time::Duration::from_secs(2)); diff --git a/p2p-broker/src/auth.rs b/p2p-broker/src/auth.rs deleted file mode 100644 index fbb1678..0000000 --- a/p2p-broker/src/auth.rs +++ /dev/null @@ -1,180 +0,0 @@ -// Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers -// All rights reserved. -// Licensed under the Apache License, Version 2.0 -// -// or the MIT license , -// at your option. All files in the project carrying such -// notice may not be copied, modified, or distributed except -// according to those terms. - -use std::pin::Pin; - -use debug_print::*; -use futures::future::BoxFuture; -use futures::future::OptionFuture; -use futures::FutureExt; -use p2p_net::actors::*; -use p2p_net::errors::*; -use p2p_net::types::*; -use p2p_repo::types::*; -use p2p_repo::utils::*; -use rust_fsm::*; - -// state_machine! { -// derive(Debug) -// AuthProtocolClient(Ready) - -// Ready(ClientHelloSent) => ClientHelloSent, -// ClientHelloSent(ServerHelloReceived) => ServerHelloReceived, -// ServerHelloReceived(ClientAuthSent) => ClientAuthSent, -// ClientAuthSent(AuthResultReceived) => AuthResult, -// AuthResult => { -// Ok => BrokerProtocol, -// Error => Closed, -// }, -// } - -// state_machine! { -// derive(Debug) -// AuthProtocolServer(Ready) - -// Ready(ClientHelloReceived) => ClientHelloReceived, -// ClientHelloReceived(ServerHelloSent) => ServerHelloSent, -// ServerHelloSent(ClientAuthReceived) => ClientAuthReceived, -// ClientAuthReceived => { -// Ok => AuthResultOk, -// Error => AuthResultError, -// }, -// AuthResultOk(AuthResultSent) => BrokerProtocol, -// AuthResultError(AuthResultSent) => Closed, -// } - -#[derive(Debug)] -pub struct AuthProtocolHandler { - //machine: StateMachine, - nonce: Option>, - user: Option, -} - -impl AuthProtocolHandler { - pub fn new() -> AuthProtocolHandler { - AuthProtocolHandler { - //machine: StateMachine::new(), - nonce: None, - user: None, - } - } - - pub fn get_user(&self) -> Option { - self.user - } - - pub fn handle_init(&mut self, client_hello: ClientHello) -> Result, ProtocolError> { - // let _ = self - // .machine - // .consume(&AuthProtocolServerInput::ClientHelloReceived) - // .map_err(|_e| ProtocolError::InvalidState)?; - - let mut random_buf = [0u8; 32]; - getrandom::getrandom(&mut random_buf).unwrap(); - let nonce = random_buf.to_vec(); - let reply = ServerHello::V0(ServerHelloV0 { - nonce: nonce.clone(), - }); - self.nonce = Some(nonce); - - // let _ = self - // .machine - // .consume(&AuthProtocolServerInput::ServerHelloSent) - // .map_err(|_e| ProtocolError::InvalidState)?; - - //debug_println!("sending nonce to client: {:?}", self.nonce); - - Ok(serde_bare::to_vec(&reply).unwrap()) - } - - pub fn handle_incoming( - &mut self, - frame: Vec, - ) -> ( - Result, ProtocolError>, - Pin>>>, - ) { - fn prepare_reply(res: Result, ProtocolError>) -> AuthResult { - let (result, metadata) = match res { - Ok(m) => (0, m), - Err(e) => (e.into(), vec![]), - }; - AuthResult::V0(AuthResultV0 { result, metadata }) - } - - fn process_state( - handler: &mut AuthProtocolHandler, - frame: Vec, - ) -> Result, ProtocolError> { - // match handler.machine.state() { - // &AuthProtocolServerState::ServerHelloSent => { - let message = serde_bare::from_slice::(&frame)?; - // let _ = handler - // .machine - // .consume(&AuthProtocolServerInput::ClientAuthReceived) - // .map_err(|_e| ProtocolError::InvalidState)?; - - // verifying client auth - - debug_println!("verifying client auth"); - - let _ = verify( - &serde_bare::to_vec(&message.content_v0()).unwrap(), - message.sig(), - message.user(), - ) - .map_err(|_e| ProtocolError::AccessDenied)?; - - // debug_println!( - // "matching nonce : {:?} {:?}", - // message.nonce(), - // handler.nonce.as_ref().unwrap() - // ); - - if message.nonce() != handler.nonce.as_ref().unwrap() { - // let _ = handler - // .machine - // .consume(&AuthProtocolServerInput::Error) - // .map_err(|_e| ProtocolError::InvalidState); - - return Err(ProtocolError::AccessDenied); - } - - // TODO check that the device has been registered for this user. if not, return AccessDenied - - // all is good, we advance the FSM and send back response - // let _ = handler - // .machine - // .consume(&AuthProtocolServerInput::Ok) - // .map_err(|_e| ProtocolError::InvalidState)?; - - handler.user = Some(message.user()); - - Ok(vec![]) // without any metadata - //} - //_ => Err(ProtocolError::InvalidState), - //} - } - - let res = process_state(self, frame); - let is_err = res.as_ref().err().cloned(); - let reply = prepare_reply(res); - let reply_ser: Result, ProtocolError> = Ok(serde_bare::to_vec(&reply).unwrap()); - if is_err.is_some() { - ( - reply_ser, - Box::pin(OptionFuture::from(Some( - async move { reply.result() }.boxed(), - ))), - ) - } else { - (reply_ser, Box::pin(OptionFuture::from(None))) - } - } -} diff --git a/p2p-broker/src/connection_local.rs b/p2p-broker/src/connection_local.rs deleted file mode 100644 index 3eccaf7..0000000 --- a/p2p-broker/src/connection_local.rs +++ /dev/null @@ -1,147 +0,0 @@ -// Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers -// All rights reserved. -// Licensed under the Apache License, Version 2.0 -// -// or the MIT license , -// at your option. All files in the project carrying such -// notice may not be copied, modified, or distributed except -// according to those terms. - -//! Local Connection to a Broker - -use futures::{ - ready, - stream::Stream, - task::{Context, Poll}, - Future, - select, FutureExt, -}; -use futures::channel::mpsc; -use std::pin::Pin; -use std::{collections::HashSet, fmt::Debug}; - -use crate::server::BrokerServer; -use debug_print::*; -use futures::{pin_mut, stream, Sink, SinkExt, StreamExt}; -use p2p_repo::object::*; -use p2p_repo::store::*; -use p2p_repo::types::*; -use p2p_repo::utils::*; -use p2p_net::errors::*; -use p2p_net::types::*; -use p2p_net::broker_connection::*; -use std::collections::HashMap; - - -pub struct BrokerConnectionLocal<'a> { - broker: &'a mut BrokerServer, - user: PubKey, -} - -#[async_trait::async_trait] -impl<'a> BrokerConnection for BrokerConnectionLocal<'a> { - type OC = BrokerConnectionLocal<'a>; - type BlockStream = async_channel::Receiver; - - async fn close(&mut self) {} - - async fn add_user( - &mut self, - user_id: PubKey, - admin_user_pk: PrivKey, - ) -> Result<(), ProtocolError> { - let op_content = AddUserContentV0 { user: user_id }; - let sig = sign(admin_user_pk, self.user, &serde_bare::to_vec(&op_content)?)?; - - self.broker.add_user(self.user, user_id, sig) - } - - async fn process_overlay_request( - &mut self, - overlay: OverlayId, - request: BrokerOverlayRequestContentV0, - ) -> Result<(), ProtocolError> { - match request { - BrokerOverlayRequestContentV0::OverlayConnect(_) => { - self.broker.connect_overlay(self.user, overlay) - } - BrokerOverlayRequestContentV0::OverlayJoin(j) => { - self.broker - .join_overlay(self.user, overlay, j.repo_pubkey(), j.secret(), j.peers()) - } - BrokerOverlayRequestContentV0::ObjectPin(op) => { - self.broker.pin_object(self.user, overlay, op.id()) - } - BrokerOverlayRequestContentV0::ObjectUnpin(op) => { - self.broker.unpin_object(self.user, overlay, op.id()) - } - BrokerOverlayRequestContentV0::ObjectDel(op) => { - self.broker.del_object(self.user, overlay, op.id()) - } - BrokerOverlayRequestContentV0::BlockPut(b) => { - self.broker.put_block(self.user, overlay, b.block()) - } - _ => Err(ProtocolError::InvalidState), - } - } - - async fn process_overlay_request_objectid_response( - &mut self, - overlay: OverlayId, - request: BrokerOverlayRequestContentV0, - ) -> Result { - match request { - BrokerOverlayRequestContentV0::ObjectCopy(oc) => { - self.broker - .copy_object(self.user, overlay, oc.id(), oc.expiry()) - } - _ => Err(ProtocolError::InvalidState), - } - } - - async fn process_overlay_request_stream_response( - &mut self, - overlay: OverlayId, - request: BrokerOverlayRequestContentV0, - ) -> Result>, ProtocolError> { - match request { - - BrokerOverlayRequestContentV0::BlockGet(b) => self - .broker - .get_block(self.user, overlay, b.id(), b.include_children(), b.topic()) - .map(|r| Box::pin(r)), - BrokerOverlayRequestContentV0::BranchSyncReq(b) => self - .broker - .sync_branch( - self.user, - &overlay, - b.heads(), - b.known_heads(), - b.known_commits(), - ) - .map(|r| Box::pin(r)), - _ => Err(ProtocolError::InvalidState), - } - } - - async fn del_user(&mut self, user_id: PubKey, admin_user_pk: PrivKey) {} - - async fn add_client(&mut self, user_id: PubKey, admin_user_pk: PrivKey) {} - - async fn del_client(&mut self, user_id: PubKey, admin_user_pk: PrivKey) {} - - async fn overlay_connect( - &mut self, - repo_link: &RepoLink, - public: bool, - ) -> Result>, ProtocolError> { - let overlay = self.process_overlay_connect(repo_link, public).await?; - Ok(OverlayConnectionClient::create(self, overlay, repo_link.clone())) - } -} - -impl<'a> BrokerConnectionLocal<'a> { - pub fn new(broker: &'a mut BrokerServer, user: PubKey) -> BrokerConnectionLocal<'a> { - BrokerConnectionLocal { broker, user } - } -} \ No newline at end of file diff --git a/p2p-broker/src/lib.rs b/p2p-broker/src/lib.rs index 6be96f1..c2cfd50 100644 --- a/p2p-broker/src/lib.rs +++ b/p2p-broker/src/lib.rs @@ -1,10 +1,3 @@ - pub mod broker_store; -pub mod connection_local; - -pub mod server; - pub mod server_ws; - -pub mod auth; diff --git a/p2p-broker/src/server.rs b/p2p-broker/src/server.rs index c90db32..4066ff3 100644 --- a/p2p-broker/src/server.rs +++ b/p2p-broker/src/server.rs @@ -16,14 +16,12 @@ use std::pin::Pin; use std::sync::Arc; use std::sync::RwLock; -use crate::auth::*; use crate::broker_store::account::Account; use crate::broker_store::config::Config; use crate::broker_store::config::ConfigMode; use crate::broker_store::overlay::Overlay; use crate::broker_store::peer::Peer; use crate::broker_store::repostoreinfo::RepoStoreInfo; -use crate::connection_local::BrokerConnectionLocal; use async_std::task; use debug_print::*; use futures::future::BoxFuture; diff --git a/p2p-broker/src/server_connection.rs b/p2p-broker/src/server_connection.rs deleted file mode 100644 index 3643acf..0000000 --- a/p2p-broker/src/server_connection.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub struct ServerConnection {} - -impl ServerConnection {} diff --git a/p2p-broker/src/server_ws.rs b/p2p-broker/src/server_ws.rs index f15b23e..9363676 100644 --- a/p2p-broker/src/server_ws.rs +++ b/p2p-broker/src/server_ws.rs @@ -12,7 +12,6 @@ //! WebSocket implementation of the Broker use crate::broker_store::config::ConfigMode; -use crate::server::*; use async_std::net::{TcpListener, TcpStream}; use async_std::sync::Mutex; use async_std::task; @@ -35,116 +34,48 @@ use std::sync::Arc; use std::{thread, time}; use tempfile::Builder; -async fn connection_loop(tcp: TcpStream, mut handler: ProtocolHandler) -> std::io::Result<()> { - let addr = tcp.peer_addr().unwrap(); - handler.register(addr); - +pub async fn accept(tcp: TcpStream, peer_priv_key: Sensitive<[u8; 32]>, peer_pub_key: PubKey) { + let sock_addr = tcp.peer_addr().unwrap(); + let ip = sock_addr.ip(); let mut ws = accept_async(tcp).await.unwrap(); - let (mut tx, mut rx) = ws.split(); - - let mut tx_mutex = Arc::new(Mutex::new(tx)); - - // setup the async frames task - let receiver = handler.async_frames_receiver(); - let ws_in_task = Arc::clone(&tx_mutex); - task::spawn(async move { - while let Ok(frame) = receiver.recv().await { - let mut sink = ws_in_task.lock().await; - if sink.send(Message::binary(frame)).await.is_err() { - break; - } - } - debug_println!("end of async frames loop"); - - let mut sink = ws_in_task.lock().await; - let _ = sink.send(Message::Close(None)).await; - let _ = sink.close().await; - }); - while let Some(msg) = rx.next().await { - //debug_println!("RCV: {:?}", msg); - let msg = match msg { - Err(e) => { - debug_println!("Error on server stream: {:?}", e); - // Errors returned directly through the AsyncRead/Write API are fatal, generally an error on the underlying - // transport. closing connection - break; - } - Ok(m) => m, - }; - //TODO implement PING messages - if msg.is_close() { - debug_println!("CLOSE from CLIENT"); - if let Message::Close(Some(cf)) = msg { - debug_println!("CLOSE FRAME {:?}", cf); - } else if let Message::Close(None) = msg { - debug_println!("without CLOSE FRAME"); - } - break; - } else if msg.is_binary() { - //debug_println!("server received binary: {:?}", msg); + let cws = ConnectionWebSocket {}; + let base = cws.accept(peer_priv_key, peer_pub_key, ws).await.unwrap(); - let replies = handler.handle_incoming(msg.into_data()).await; + //TODO FIXME get remote_peer_id from ConnectionBase (once it is available) + let (priv_key, pub_key) = generate_keypair(); + let remote_peer_id = pub_key; - match replies.0 { - Err(e) => { - debug_println!("Protocol Error: {:?}", e); - // dealing with ProtocolErrors (closing the connection) - break; - } - Ok(r) => { - if tx_mutex - .lock() - .await - .send(Message::binary(r)) - .await - .is_err() - { - //dealing with sending errors (closing the connection) - break; - } - } - } - match replies.1.await { - Some(errcode) => { - if errcode > 0 { - debug_println!("Close due to error code : {:?}", errcode); - //closing connection - break; - } - } - None => {} - } - } - } - handler.deregister(); - let mut sink = tx_mutex.lock().await; - let _ = sink.send(Message::Close(None)).await; - let _ = sink.close().await; - debug_println!("end of sync read+write loop"); - Ok(()) + let res = BROKER + .write() + .await + .accept(base, IP::try_from(&ip).unwrap(), None, remote_peer_id) + .await; } -pub async fn run_server_accept_one(addrs: &str) -> std::io::Result<()> { - let root = tempfile::Builder::new() - .prefix("node-daemon") - .tempdir() - .unwrap(); +pub async fn run_server_accept_one( + addrs: &str, + peer_priv_key: Sensitive<[u8; 32]>, + peer_pub_key: PubKey, +) -> std::io::Result<()> { + let root = tempfile::Builder::new().prefix("ngd").tempdir().unwrap(); let master_key: [u8; 32] = [0; 32]; std::fs::create_dir_all(root.path()).unwrap(); println!("{}", root.path().to_str().unwrap()); let store = LmdbBrokerStore::open(root.path(), master_key); - let server: BrokerServer = - BrokerServer::new(store, ConfigMode::Local).expect("starting broker"); + // TODO: remove this part + // let server: BrokerServer = + // BrokerServer::new(store, ConfigMode::Local).expect("starting broker"); + // let server_arc = Arc::new(server); let socket = TcpListener::bind(addrs).await?; debug_println!("Listening on {}", addrs); let mut connections = socket.incoming(); - let server_arc = Arc::new(server); + let tcp = connections.next().await.unwrap()?; - let proto_handler = Arc::clone(&server_arc).protocol_handler(); - let _handle = task::spawn(connection_loop(tcp, proto_handler)); + + accept(tcp, peer_priv_key, peer_pub_key).await; Ok(()) } @@ -154,47 +85,28 @@ pub async fn run_server( peer_priv_key: Sensitive<[u8; 32]>, peer_pub_key: PubKey, ) -> std::io::Result<()> { - let root = tempfile::Builder::new() - .prefix("node-daemon") - .tempdir() - .unwrap(); + let root = tempfile::Builder::new().prefix("ngd").tempdir().unwrap(); let master_key: [u8; 32] = [0; 32]; std::fs::create_dir_all(root.path()).unwrap(); println!("{}", root.path().to_str().unwrap()); let store = LmdbBrokerStore::open(root.path(), master_key); - let server: BrokerServer = - BrokerServer::new(store, ConfigMode::Local).expect("starting broker"); + // TODO: remove this part + // let server: BrokerServer = + // BrokerServer::new(store, ConfigMode::Local).expect("starting broker"); + // let server_arc = Arc::new(server); let socket = TcpListener::bind(addrs).await?; debug_println!("Listening on {}", addrs); let mut connections = socket.incoming(); - let server_arc = Arc::new(server); - while let Some(tcp) = connections.next().await { - let tcp = tcp.unwrap(); - let sock_addr = tcp.peer_addr().unwrap(); - let ip = sock_addr.ip(); - let mut ws = accept_async(tcp).await.unwrap(); - - let cws = ConnectionWebSocket {}; - let base = cws - .accept( - Sensitive::<[u8; 32]>::from_slice(peer_priv_key.deref()), - peer_pub_key, - ws, - ) - .await - .unwrap(); - //TODO FIXME get remote_peer_id from ConnectionBase (once it is available) - let (priv_key, pub_key) = generate_keypair(); - let remote_peer_id = pub_key; - - let res = BROKER - .write() - .await - .accept(base, IP::try_from(&ip).unwrap(), None, remote_peer_id) - .await; + while let Some(tcp) = connections.next().await { + accept( + tcp.unwrap(), + Sensitive::<[u8; 32]>::from_slice(peer_priv_key.deref()), + peer_pub_key, + ) + .await; } Ok(()) } diff --git a/p2p-client-ws/Cargo.toml b/p2p-client-ws/Cargo.toml index 7a0b72f..97c7638 100644 --- a/p2p-client-ws/Cargo.toml +++ b/p2p-client-ws/Cargo.toml @@ -35,4 +35,3 @@ features = ["js"] getrandom = "0.2.7" xactor = "0.7.11" async-tungstenite = { version = "0.17.2", features = ["async-std-runtime"] } -p2p-client = { path = "../p2p-client" } \ No newline at end of file diff --git a/p2p-client-ws/src/connection_ws.rs b/p2p-client-ws/src/connection_ws.rs deleted file mode 100644 index 80e22d3..0000000 --- a/p2p-client-ws/src/connection_ws.rs +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers - * All rights reserved. - * Licensed under the Apache License, Version 2.0 - * - * or the MIT license , - * at your option. All files in the project carrying such - * notice may not be copied, modified, or distributed except - * according to those terms. -*/ - -//! WebSocket Remote Connection to a Broker - -use debug_print::*; - -use futures::{future, pin_mut, stream, SinkExt, StreamExt}; -use p2p_client::connection_remote::*; -use p2p_net::broker_connection::*; -use p2p_net::errors::*; -use p2p_net::types::*; -use p2p_repo::types::*; -use p2p_repo::utils::{generate_keypair, now_timestamp}; - -use async_tungstenite::async_std::connect_async; -use async_tungstenite::client_async; -use async_tungstenite::tungstenite::{Error, Message}; - -pub struct BrokerConnectionWebSocket {} - -impl BrokerConnectionWebSocket { - pub async fn open( - url: &str, - priv_key: PrivKey, - pub_key: PubKey, - ) -> Result { - let res = connect_async(url).await; - - match (res) { - Ok((ws, _)) => { - debug_println!("WebSocket handshake completed"); - - let (write, read) = ws.split(); - let mut frames_stream_read = read.map(|msg_res| match msg_res { - Err(e) => { - debug_println!("ERROR {:?}", e); - vec![] - } - Ok(message) => { - if message.is_close() { - debug_println!("CLOSE FROM SERVER"); - vec![] - } else { - message.into_data() - } - } - }); - async fn transform(message: Vec) -> Result { - if message.len() == 0 { - debug_println!("sending CLOSE message to SERVER"); - Ok(Message::Close(None)) - } else { - Ok(Message::binary(message)) - } - } - let frames_stream_write = write - .with(|message| transform(message)) - .sink_map_err(|e| ProtocolError::WriteError); - - let master_key: [u8; 32] = [0; 32]; - let mut cnx_res = ConnectionRemote::open_broker_connection( - frames_stream_write, - frames_stream_read, - pub_key, - priv_key, - PubKey::Ed25519PubKey([1; 32]), - ) - .await; - - match cnx_res { - Ok(mut cnx) => Ok(cnx), - Err(e) => { - debug_println!("cannot connect {:?}", e); - Err(e) - } - } - } - Err(e) => { - debug_println!("Cannot connect: {:?}", e); - Err(ProtocolError::ConnectionError) - } - } - } -} diff --git a/p2p-client-ws/src/lib.rs b/p2p-client-ws/src/lib.rs index 2305aec..94b4d09 100644 --- a/p2p-client-ws/src/lib.rs +++ b/p2p-client-ws/src/lib.rs @@ -44,8 +44,6 @@ macro_rules! after { }; } -//pub mod connection_ws; - #[cfg(not(target_arch = "wasm32"))] pub mod remote_ws; diff --git a/p2p-client-ws/src/remote_ws.rs b/p2p-client-ws/src/remote_ws.rs index 1b37428..6eff679 100644 --- a/p2p-client-ws/src/remote_ws.rs +++ b/p2p-client-ws/src/remote_ws.rs @@ -61,65 +61,16 @@ impl IConnect for ConnectionWebSocket { Err(NetError::ConnectionError) } Ok((mut websocket, _)) => { - //let ws = Arc::new(Mutex::new(Box::pin(websocket))); - - // let (write, read) = ws.split(); - // let mut stream_read = read.map(|msg_res| match msg_res { - // Err(e) => { - // debug_println!("READ ERROR {:?}", e); - // ConnectionCommand::Error(NetError::IoError) - // } - // Ok(message) => { - // if message.is_close() { - // debug_println!("CLOSE FROM SERVER"); - // ConnectionCommand::Close - // } else { - // ConnectionCommand::Msg( - // serde_bare::from_slice::(&message.into_data()) - // .unwrap(), - // ) - // } - // } - // }); - // async fn write_transform(cmd: ConnectionCommand) -> Result { - // match cmd { - // ConnectionCommand::Error(_) => Err(Error::AlreadyClosed), //FIXME - // ConnectionCommand::ProtocolError(_) => Err(Error::AlreadyClosed), //FIXME - // ConnectionCommand::Close => { - // // todo close cnx. } - // Err(Error::AlreadyClosed) - // } - // ConnectionCommand::Msg(msg) => Ok(Message::binary( - // serde_bare::to_vec(&msg) - // .map_err(|_| Error::AlreadyClosed) //FIXME - // .unwrap(), - // )), - // } - // } - // let stream_write = write - // .with(|message| write_transform(message)) - // .sink_map_err(|e| NetError::IoError); - - // ws.close(Some(CloseFrame { - // code: CloseCode::Library(4000), - // reason: std::borrow::Cow::Borrowed(""), - // })) - // .await; - cnx.start_read_loop(peer_privk, Some(remote_peer)); let s = cnx.take_sender(); let r = cnx.take_receiver(); let mut shutdown = cnx.set_shutdown(); - //let ws_in_task = Arc::clone(&ws); + let join = task::spawn(async move { debug_println!("START of WS loop"); - //let w = ws_in_task.lock().await; + let res = ws_loop(websocket, s, r).await; - // .close(Some(CloseFrame { - // code: CloseCode::Library(4000), - // reason: std::borrow::Cow::Borrowed(""), - // })) - // .await; + if res.is_err() { let _ = shutdown.send(res.err().unwrap()).await; } @@ -128,37 +79,7 @@ impl IConnect for ConnectionWebSocket { cnx.start(config).await; - //spawn_and_log_error(ws_loop(ws, cnx.take_sender(), cnx.take_receiver())); - - // - - //cnx.close().await; - - //// let res = cnx.join_shutdown().await; - //// log!("JOIN SHUTDOWN {:?}", res); - // cnx.send(ConnectionCommand::Close).await; - - // cnx.send(ConnectionCommand::Msg(ProtocolMessage::Start( - // StartProtocol::Auth(ClientHello::V0()), - // ))) - // .await; - - //cnx.close().await; - - // let _ = cnx.inject(last_command).await; - // let _ = cnx.close_streams().await; - - // Note that since WsMeta::connect resolves to an opened connection, we don't see - // any Open events here. - // - //assert!(evts.next().await.unwrap_throw().is_closing()); - - // TODO wait for close - - //log!("WS closed {:?}", last_event.clone()); - Ok(cnx) - //Ok(()) } } } @@ -326,7 +247,6 @@ async fn ws_loop( return Err(e); } } - //log!("END OF LOOP"); Ok(()) } diff --git a/p2p-client-ws/src/remote_ws_wasm.rs b/p2p-client-ws/src/remote_ws_wasm.rs index 3b4c0be..5862a6f 100644 --- a/p2p-client-ws/src/remote_ws_wasm.rs +++ b/p2p-client-ws/src/remote_ws_wasm.rs @@ -42,7 +42,6 @@ impl IConnect for ConnectionWebSocket { remote_peer: DirectPeerId, config: StartConfig, ) -> Result { - //pub async fn testt(url: &str) -> ResultSend<()> { let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS); let url = format!("ws://{}:{}", ip, WS_PORT); @@ -52,9 +51,6 @@ impl IConnect for ConnectionWebSocket { NetError::ConnectionError })?; - //let (mut sender_tx, sender_rx) = mpsc::unbounded(); - //let (mut receiver_tx, receiver_rx) = mpsc::unbounded(); - cnx.start_read_loop(peer_privk, Some(remote_peer)); let mut shutdown = cnx.set_shutdown(); @@ -68,32 +64,6 @@ impl IConnect for ConnectionWebSocket { cnx.start(config).await; - //spawn_and_log_error(read_loop(receiver_rx, sender_tx.clone())); - - //cnx.close().await; - // spawn_and_log_error(async move { - // TimeoutFuture::new(10_000).await; - // cnx.close().await; - - // Ok(()) - // // // Do something here after the one second timeout is up! - // }); - // cnx.send(ConnectionCommand::Close).await; - - // cnx.send(ConnectionCommand::Msg(ProtocolMessage::Start( - // StartProtocol::Auth(ClientHello::V0()), - // ))) - // .await; - //log!("waiting..."); - //let res = join.next().await; - //log!("finished..."); - //log!("JOIN SHUTDOWN {:?}", res); - // Note that since WsMeta::connect resolves to an opened connection, we don't see - // any Open events here. - // - //assert!(events.next().await.unwrap_throw().is_closing()); - - //Ok(cnx) Ok(cnx) } } @@ -184,7 +154,6 @@ async fn ws_loop( } } - log!("waiting for closing event"); let last_event = events.next().await; log!("WS closed {:?}", last_event.clone()); let last_command = match last_event { diff --git a/p2p-client/Cargo.toml b/p2p-client/Cargo.toml deleted file mode 100644 index 9b7952b..0000000 --- a/p2p-client/Cargo.toml +++ /dev/null @@ -1,25 +0,0 @@ -[package] -name = "p2p-client" -version = "0.1.0" -edition = "2021" -license = "MIT/Apache-2.0" -authors = ["Niko PLP "] -description = "P2P Client module of NextGraph" -repository = "https://git.nextgraph.org/NextGraph/nextgraph-rs" - -[dependencies] -debug_print = "1.0.0" -p2p-repo = { path = "../p2p-repo" } -p2p-net = { path = "../p2p-net" } -chacha20 = "0.9.0" -serde = { version = "1.0", features = ["derive"] } -serde_bare = "0.5.0" -serde_bytes = "0.11.7" -xactor = "0.7.11" -async-trait = "0.1.64" -async-std = { version = "1.12.0", features = ["attributes"] } -futures = "0.3.24" -async-channel = "1.7.1" -async-oneshot = "0.5.0" -snow = "0.9.2" - diff --git a/p2p-client/src/connection_remote.rs b/p2p-client/src/connection_remote.rs deleted file mode 100644 index 4a0dff0..0000000 --- a/p2p-client/src/connection_remote.rs +++ /dev/null @@ -1,598 +0,0 @@ -/* - * Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers - * All rights reserved. - * Licensed under the Apache License, Version 2.0 - * - * or the MIT license , - * at your option. All files in the project carrying such - * notice may not be copied, modified, or distributed except - * according to those terms. -*/ - -//! Remote Connection to a Broker - -use async_std::task; -use async_std::sync::Mutex; -use futures::{ - ready, - stream::Stream, - task::{Context, Poll}, - Future, - select, FutureExt, -}; -use futures::channel::mpsc; -use std::pin::Pin; -use std::{collections::HashSet, fmt::Debug}; - -use async_oneshot::oneshot; -use debug_print::*; -use futures::{pin_mut, stream, Sink, SinkExt, StreamExt}; -use p2p_repo::object::*; -use p2p_repo::store::*; -use p2p_repo::types::*; -use p2p_repo::utils::*; -use p2p_net::errors::*; -use p2p_net::types::*; -use p2p_net::broker_connection::*; -use std::collections::HashMap; -use std::sync::{Arc, RwLock}; -use xactor::{message, spawn, Actor, Addr, Handler, WeakAddr}; - - -#[message] -struct BrokerMessageXActor(BrokerMessage); - -struct BrokerMessageActor { - r: Option>, - s: async_oneshot::Sender, -} - -impl Actor for BrokerMessageActor {} - -impl BrokerMessageActor { - fn new() -> BrokerMessageActor { - let (s, r) = oneshot::(); - BrokerMessageActor { r: Some(r), s } - } - fn resolve(&mut self, msg: BrokerMessage) { - let _ = self.s.send(msg); - } - - fn receiver(&mut self) -> async_oneshot::Receiver { - self.r.take().unwrap() - } -} - -struct BrokerMessageStreamActor { - r: Option>, - s: async_channel::Sender, - error_r: Option>>, - error_s: Option>>, -} - -impl Actor for BrokerMessageStreamActor {} - -impl BrokerMessageStreamActor { - fn new() -> BrokerMessageStreamActor { - let (s, r) = async_channel::unbounded::(); - let (error_s, error_r) = oneshot::>(); - BrokerMessageStreamActor { - r: Some(r), - s, - error_r: Some(error_r), - error_s: Some(error_s), - } - } - async fn partial(&mut self, block: Block) -> Result<(), ProtocolError> { - //debug_println!("GOT PARTIAL {:?}", block.id()); - self.s - .send(block) - .await - .map_err(|e| ProtocolError::WriteError) - } - - fn receiver(&mut self) -> async_channel::Receiver { - self.r.take().unwrap() - } - - fn error_receiver(&mut self) -> async_oneshot::Receiver> { - self.error_r.take().unwrap() - } - - fn send_error(&mut self, err: Option) { - if self.error_s.is_some() { - let _ = self.error_s.take().unwrap().send(err); - self.error_s = None; - } - } - - fn close(&mut self) { - self.s.close(); - } -} - -#[async_trait::async_trait] -impl Handler for BrokerMessageActor { - async fn handle(&mut self, ctx: &mut xactor::Context, msg: BrokerMessageXActor) { - //println!("handling {:?}", msg.0); - self.resolve(msg.0); - ctx.stop(None); - } -} - -#[async_trait::async_trait] -impl Handler for BrokerMessageStreamActor { - async fn handle(&mut self, ctx: &mut xactor::Context, msg: BrokerMessageXActor) { - //println!("handling {:?}", msg.0); - let res: Result, ProtocolError> = msg.0.into(); - match res { - Err(e) => { - self.send_error(Some(e)); - ctx.stop(None); - self.close(); - } - Ok(Some(b)) => { - self.send_error(None); - // it must be a partial content - let res = self.partial(b).await; - if let Err(e) = res { - ctx.stop(None); - self.close(); - } - } - Ok(None) => { - self.send_error(None); - ctx.stop(None); - self.close(); - } - } - } -} - -pub struct ConnectionRemote {} - -impl ConnectionRemote { - pub async fn ext_request< - B: Stream> + StreamExt + Send + Sync, - A: Sink, Error = ProtocolError> + Send, - >( - w: A, - r: B, - request: ExtRequest, - ) -> Result { - unimplemented!(); - } - - async fn close(w: S, err: ProtocolError) -> ProtocolError - where - S: Sink, Error = ProtocolError>, - { - let mut writer = Box::pin(w); - let _ = writer.send(vec![]); - let _ = writer.close().await; - err - } - - pub async fn open_broker_connection< - B: Stream> + StreamExt + Send + Sync + 'static, - A: Sink, Error = ProtocolError> + Send + 'static, - >( - w: A, - r: B, - user: PubKey, - user_pk: PrivKey, - client: PubKey, - ) -> Result { - let mut writer = Box::pin(w); - writer - .send(serde_bare::to_vec(&StartProtocol::Auth(ClientHello::V0()))?) - .await - .map_err(|_e| ProtocolError::WriteError)?; - - let mut reader = Box::pin(r); - let answer = reader.next().await; - if answer.is_none() { - return Err(Self::close(writer, ProtocolError::InvalidState).await); - } - - let server_hello = serde_bare::from_slice::(&answer.unwrap())?; - - //debug_println!("received nonce from server: {:?}", server_hello.nonce()); - - let content = ClientAuthContentV0 { - user, - client, - nonce: server_hello.nonce().clone(), - }; - - let sig = sign(user_pk, user, &serde_bare::to_vec(&content)?) - .map_err(|_e| ProtocolError::SignatureError)?; - - let auth_ser = serde_bare::to_vec(&ClientAuth::V0(ClientAuthV0 { content, sig }))?; - //debug_println!("AUTH SENT {:?}", auth_ser); - writer - .send(auth_ser) - .await - .map_err(|_e| ProtocolError::WriteError)?; - - let answer = reader.next().await; - if answer.is_none() { - //return Err(ProtocolError::InvalidState); - return Err(Self::close(writer, ProtocolError::InvalidState).await); - } - - let auth_result = serde_bare::from_slice::(&answer.unwrap())?; - - match auth_result.result() { - 0 => { - async fn transform(message: BrokerMessage) -> Result, ProtocolError> { - if message.is_close() { - Ok(vec![]) - } else { - Ok(serde_bare::to_vec(&message)?) - } - } - let messages_stream_write = writer.with(|message| transform(message)); - - let mut messages_stream_read = reader.map(|message| { - if message.len() == 0 { - BrokerMessage::Close - } else { - match serde_bare::from_slice::(&message) { - Err(e) => BrokerMessage::Close, - Ok(m) => m - } - } - }); - - let cnx = - BrokerConnectionRemote::open(messages_stream_write, messages_stream_read, user); - - Ok(cnx) - } - err => Err(Self::close(writer, ProtocolError::try_from(err).unwrap()).await), - } - } -} - -pub struct BrokerConnectionRemote -where - T: Sink + Send + 'static, -{ - writer: Arc>>>, - user: PubKey, - actors: Arc>>>, - stream_actors: Arc>>>, - shutdown: mpsc::UnboundedSender, -} - -#[async_trait::async_trait] -impl BrokerConnection for BrokerConnectionRemote -where - T: Sink + Send, -{ - type OC = BrokerConnectionRemote; - type BlockStream = async_channel::Receiver; - - async fn close(&mut self) { - let _ = self.shutdown.close().await; - let mut w = self.writer.lock().await; - let _ = w.send(BrokerMessage::Close).await; - let _ = w.close().await; - } - - async fn process_overlay_request_stream_response( - &mut self, - overlay: OverlayId, - request: BrokerOverlayRequestContentV0, - ) -> Result>, ProtocolError> { - let mut actor = BrokerMessageStreamActor::new(); - let receiver = actor.receiver(); - let error_receiver = actor.error_receiver(); - let mut addr = actor - .start() - .await - .map_err(|_e| ProtocolError::ActorError)?; - - let request_id = addr.actor_id(); - //debug_println!("actor ID {}", request_id); - - { - let mut map = self.stream_actors.write().expect("RwLock poisoned"); - map.insert(request_id, addr.downgrade()); - } - - let mut w = self.writer.lock().await; - w.send(BrokerMessage::V0(BrokerMessageV0 { - padding: vec![], //FIXME implement padding - content: BrokerMessageContentV0::BrokerOverlayMessage(BrokerOverlayMessage::V0( - BrokerOverlayMessageV0 { - overlay, - content: BrokerOverlayMessageContentV0::BrokerOverlayRequest( - BrokerOverlayRequest::V0(BrokerOverlayRequestV0 { - id: request_id, - content: request, - }), - ), - }, - )), - })) - .await - .map_err(|_e| ProtocolError::WriteError)?; - - //debug_println!("waiting for first reply"); - let reply = error_receiver.await; - match reply { - Err(_e) => { - Err(ProtocolError::Closing) - } - Ok(Some(e)) => { - let mut map = self.stream_actors.write().expect("RwLock poisoned"); - map.remove(&request_id); - return Err(e); - } - Ok(None) => { - let stream_actors_in_thread = Arc::clone(&self.stream_actors); - task::spawn(async move { - addr.wait_for_stop().await; // TODO add timeout - let mut map = stream_actors_in_thread.write().expect("RwLock poisoned"); - map.remove(&request_id); - }); - - Ok(Box::pin(receiver)) - } - } - } - - async fn process_overlay_request_objectid_response( - &mut self, - overlay: OverlayId, - request: BrokerOverlayRequestContentV0, - ) -> Result { - before!(self, request_id, addr, receiver); - - self.writer.lock().await - .send(BrokerMessage::V0(BrokerMessageV0 { - padding: vec![], // FIXME implement padding - content: BrokerMessageContentV0::BrokerOverlayMessage(BrokerOverlayMessage::V0( - BrokerOverlayMessageV0 { - overlay, - content: BrokerOverlayMessageContentV0::BrokerOverlayRequest( - BrokerOverlayRequest::V0(BrokerOverlayRequestV0 { - id: request_id, - content: request, - }), - ), - }, - )), - })) - .await - .map_err(|_e| ProtocolError::WriteError)?; - - after!(self, request_id, addr, receiver, reply); - reply.into() - } - - async fn process_overlay_request( - &mut self, - overlay: OverlayId, - request: BrokerOverlayRequestContentV0, - ) -> Result<(), ProtocolError> { - before!(self, request_id, addr, receiver); - - self.writer.lock().await - .send(BrokerMessage::V0(BrokerMessageV0 { - padding: vec![], // FIXME implement padding - content: BrokerMessageContentV0::BrokerOverlayMessage(BrokerOverlayMessage::V0( - BrokerOverlayMessageV0 { - overlay, - content: BrokerOverlayMessageContentV0::BrokerOverlayRequest( - BrokerOverlayRequest::V0(BrokerOverlayRequestV0 { - id: request_id, - content: request, - }), - ), - }, - )), - })) - .await - .map_err(|_e| ProtocolError::WriteError)?; - - after!(self, request_id, addr, receiver, reply); - reply.into() - } - - async fn add_user( - &mut self, - user_id: PubKey, - admin_user_pk: PrivKey, - ) -> Result<(), ProtocolError> { - before!(self, request_id, addr, receiver); - - let op_content = AddUserContentV0 { user: user_id }; - - let sig = sign( - admin_user_pk, - self.user, - &serde_bare::to_vec(&op_content)?, - )?; - - self.writer.lock().await - .send(BrokerMessage::V0(BrokerMessageV0 { - padding: vec![], // TODO implement padding - content: BrokerMessageContentV0::BrokerRequest(BrokerRequest::V0( - BrokerRequestV0 { - id: request_id, - content: BrokerRequestContentV0::AddUser(AddUser::V0(AddUserV0 { - content: op_content, - sig, - })), - }, - )), - })) - .await - .map_err(|_e| ProtocolError::WriteError)?; - - after!(self, request_id, addr, receiver, reply); - reply.into() - } - - async fn del_user(&mut self, user_id: PubKey, admin_user_pk: PrivKey) {} - - async fn add_client(&mut self, client_id: ClientId, user_pk: PrivKey) {} - - async fn del_client(&mut self, client_id: ClientId, user_pk: PrivKey) {} - - async fn overlay_connect( - &mut self, - repo_link: &RepoLink, - public: bool, - ) -> Result>, ProtocolError> { - let overlay = self.process_overlay_connect(repo_link, public).await?; - - Ok(OverlayConnectionClient::create(self, overlay,repo_link.clone() )) - } -} - -#[derive(Debug)] -enum Void {} - -impl BrokerConnectionRemote -where - T: Sink + Send, -{ - async fn connection_reader_loop< - U: Stream + StreamExt + Send + Sync + Unpin + 'static, - >( - stream: U, - actors: Arc>>>, - stream_actors: Arc>>>, - shutdown: mpsc::UnboundedReceiver, - ) -> Result<(), ProtocolError> { - let mut s = stream.fuse(); - let mut shutdown = shutdown.fuse(); - loop { - select! { - void = shutdown.next().fuse() => match void { - Some(void) => match void {}, - None => break, - }, - message = s.next().fuse() => match message { - Some(message) => - { - //debug_println!("GOT MESSAGE {:?}", message); - - if message.is_close() { - // releasing the blocking calls on the actors - - let map = actors.read().expect("RwLock poisoned"); - for (a) in map.values() { - if let Some(mut addr) = a.upgrade() { - let _ = addr.stop(Some(ProtocolError::Closing.into())); - } - } - let map2 = stream_actors.read().expect("RwLock poisoned"); - for (a) in map2.values() { - if let Some(mut addr) = a.upgrade() { - let _ = addr.stop(Some(ProtocolError::Closing.into())); - } - } - return Err(ProtocolError::Closing); - } - - if message.is_request() { - debug_println!("is request {}", message.id()); - // closing connection. a client is not supposed to receive requests. - return Err(ProtocolError::Closing); - - } else if message.is_response() { - let id = message.id(); - //debug_println!("is response for {}", id); - { - let map = actors.read().expect("RwLock poisoned"); - match map.get(&id) { - Some(weak_addr) => match weak_addr.upgrade() { - Some(addr) => { - addr.send(BrokerMessageXActor(message)) - .map_err(|e| ProtocolError::Closing)? - //.expect("sending message back to actor failed"); - } - None => { - debug_println!("ERROR. Addr is dead for ID {}", id); - return Err(ProtocolError::Closing); - } - }, - None => { - let map2 = stream_actors.read().expect("RwLock poisoned"); - match map2.get(&id) { - Some(weak_addr) => match weak_addr.upgrade() { - Some(addr) => { - addr.send(BrokerMessageXActor(message)) - .map_err(|e| ProtocolError::Closing)? - //.expect("sending message back to stream actor failed"); - } - None => { - debug_println!( - "ERROR. Addr is dead for ID {} {:?}", - id, - message - ); - return Err(ProtocolError::Closing); - } - }, - None => { - debug_println!("Actor ID not found {} {:?}", id, message); - return Err(ProtocolError::Closing); - } - } - } - } - } - } - }, - None => break, - } - } - } - Ok(()) - } - - pub fn open + StreamExt + Send + Sync + Unpin + 'static>( - writer: T, - reader: U, - user: PubKey, - ) -> BrokerConnectionRemote { - let actors: Arc>>> = - Arc::new(RwLock::new(HashMap::new())); - - let stream_actors: Arc>>> = - Arc::new(RwLock::new(HashMap::new())); - - let (shutdown_sender, shutdown_receiver) = mpsc::unbounded::(); - - let w = Arc::new(Mutex::new(Box::pin(writer))); - let ws_in_task = Arc::clone(&w); - - let actors_in_thread = Arc::clone(&actors); - let stream_actors_in_thread = Arc::clone(&stream_actors); - task::spawn(async move { - debug_println!("START of reader loop"); - if let Err(e) = - Self::connection_reader_loop(reader, actors_in_thread, stream_actors_in_thread, shutdown_receiver) - .await - { - debug_println!("closing because of {}", e); - let _ = ws_in_task.lock().await.close().await; - } - debug_println!("END of reader loop"); - }); - - BrokerConnectionRemote:: { - writer: Arc::clone(&w), - user, - actors: Arc::clone(&actors), - stream_actors: Arc::clone(&stream_actors), - shutdown:shutdown_sender , - } - } -} \ No newline at end of file diff --git a/p2p-client/src/lib.rs b/p2p-client/src/lib.rs deleted file mode 100644 index 321870a..0000000 --- a/p2p-client/src/lib.rs +++ /dev/null @@ -1,47 +0,0 @@ -// All rights reserved. -// Licensed under the Apache License, Version 2.0 -// -// or the MIT license , -// at your option. All files in the project carrying such -// notice may not be copied, modified, or distributed except -// according to those terms. - -#[macro_export] -macro_rules! before { - ( $self:expr, $request_id:ident, $addr:ident, $receiver:ident ) => { - let mut actor = BrokerMessageActor::new(); - let $receiver = actor.receiver(); - let mut $addr = actor - .start() - .await - .map_err(|_e| ProtocolError::ActorError)?; - - let $request_id = $addr.actor_id(); - //debug_println!("actor ID {}", $request_id); - - { - let mut map = $self.actors.write().expect("RwLock poisoned"); - map.insert($request_id, $addr.downgrade()); - } - }; -} - -macro_rules! after { - ( $self:expr, $request_id:ident, $addr:ident, $receiver:ident, $reply:ident ) => { - //debug_println!("waiting for reply"); - - $addr.wait_for_stop().await; // TODO add timeout and close connection if there's no reply - let r = $receiver.await; - if r.is_err() { - return Err(ProtocolError::Closing); - } - let $reply = r.unwrap(); - //debug_println!("reply arrived {:?}", $reply); - { - let mut map = $self.actors.write().expect("RwLock poisoned"); - map.remove(&$request_id); - } - }; -} - -//pub mod connection_remote; diff --git a/p2p-net/src/actor.rs b/p2p-net/src/actor.rs index 3100eb0..7de7188 100644 --- a/p2p-net/src/actor.rs +++ b/p2p-net/src/actor.rs @@ -1,3 +1,14 @@ +/* + * Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers + * All rights reserved. + * Licensed under the Apache License, Version 2.0 + * + * or the MIT license , + * at your option. All files in the project carrying such + * notice may not be copied, modified, or distributed except + * according to those terms. +*/ + use async_std::stream::StreamExt; use async_std::sync::{Mutex, MutexGuard}; use futures::{channel::mpsc, SinkExt}; @@ -10,12 +21,6 @@ use crate::utils::{spawn_and_log_error, Receiver, ResultSend, Sender}; use crate::{connection::*, errors::ProtocolError, log, types::ProtocolMessage}; use std::marker::PhantomData; -// pub trait BrokerRequest: std::fmt::Debug { -// fn send(&self) -> ProtocolMessage; -// } - -//pub trait BrokerResponse: TryFrom + std::fmt::Debug {} - impl TryFrom for () { type Error = ProtocolError; fn try_from(msg: ProtocolMessage) -> Result { @@ -25,12 +30,9 @@ impl TryFrom for () { #[async_trait::async_trait] pub trait EActor: Send + Sync + std::fmt::Debug { - //type T: TryFrom + std::fmt::Debug; - //async fn handle(&mut self, msg: ProtocolMessage); async fn respond( &mut self, msg: ProtocolMessage, - //stream: Option, fsm: Arc>, ) -> Result<(), ProtocolError>; } @@ -49,37 +51,6 @@ pub struct Actor< initiator: bool, } -// #[async_trait::async_trait] -// impl< -// A: BrokerRequest + std::marker::Sync + 'static, -// B: TryFrom -// + std::fmt::Debug -// + std::marker::Sync -// + 'static, -// > EActor for Actor<'_, A, B> -// { -// //type T = B; - -// // async fn handle(&mut self, msg: ProtocolMessage) { -// // if self.initiator && msg.type_id() == TypeId::of::() -// // || !self.initiator && msg.type_id() == TypeId::of::() -// // { -// // let _ = self.receiver_tx.send(ConnectionCommand::Msg(msg)).await; -// // } else { -// // log!("NOT OK"); -// // } -// // } - -// // async fn respond(id: i64, msg: A) -> Result { -// // let mut actor = Box::new(Actor::::new(id, false)); -// // //actor.process_request -// // match self.receiver.next().await { -// // Some(msg) => B::receive(msg), -// // _ => Err(ProtocolError::ActorError), -// // } -// // } -// } - pub enum SoS { Single(B), Stream(Receiver), @@ -139,10 +110,8 @@ impl< pub async fn request( &mut self, msg: ProtocolMessage, - //stream: Option, fsm: Arc>, ) -> Result, ProtocolError> { - //sender.send(ConnectionCommand::Msg(msg.send())).await; fsm.lock().await.send(msg).await?; let mut receiver = self.receiver.take().unwrap(); match receiver.next().await { diff --git a/p2p-net/src/actors/noise.rs b/p2p-net/src/actors/noise.rs index 0c97f18..fe5c457 100644 --- a/p2p-net/src/actors/noise.rs +++ b/p2p-net/src/actors/noise.rs @@ -1,3 +1,14 @@ +/* + * Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers + * All rights reserved. + * Licensed under the Apache License, Version 2.0 + * + * or the MIT license , + * at your option. All files in the project carrying such + * notice may not be copied, modified, or distributed except + * according to those terms. +*/ + use std::sync::Arc; use crate::{actor::*, connection::NoiseFSM, errors::ProtocolError, types::ProtocolMessage}; @@ -24,12 +35,6 @@ impl Noise { } } -// impl BrokerRequest for Noise { -// fn send(&self) -> ProtocolMessage { -// ProtocolMessage::Noise(self.clone()) -// } -// } - impl From for ProtocolMessage { fn from(msg: Noise) -> ProtocolMessage { ProtocolMessage::Noise(msg) diff --git a/p2p-net/src/actors/start.rs b/p2p-net/src/actors/start.rs index fc59914..1fdf44b 100644 --- a/p2p-net/src/actors/start.rs +++ b/p2p-net/src/actors/start.rs @@ -1,3 +1,14 @@ +/* + * Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers + * All rights reserved. + * Licensed under the Apache License, Version 2.0 + * + * or the MIT license , + * at your option. All files in the project carrying such + * notice may not be copied, modified, or distributed except + * according to those terms. +*/ + use crate::actors::noise::Noise; use crate::connection::NoiseFSM; use crate::types::ExtResponse; @@ -102,12 +113,6 @@ impl ServerHello { } } -// impl BrokerRequest for ClientHello { -// fn send(&self) -> ProtocolMessage { -// ProtocolMessage::Start(StartProtocol::Client(ClientHello::Local)) -// } -// } - impl From for ProtocolMessage { fn from(msg: ClientHello) -> ProtocolMessage { ProtocolMessage::Start(StartProtocol::Client(msg)) @@ -136,12 +141,6 @@ impl TryFrom for ServerHello { } } -// impl BrokerRequest for ServerHello { -// fn send(&self) -> ProtocolMessage { -// ProtocolMessage::ServerHello(self.clone()) -// } -// } - impl From for ProtocolMessage { fn from(msg: ServerHello) -> ProtocolMessage { ProtocolMessage::ServerHello(msg) diff --git a/p2p-net/src/broker.rs b/p2p-net/src/broker.rs index 8dd080f..597ea63 100644 --- a/p2p-net/src/broker.rs +++ b/p2p-net/src/broker.rs @@ -1,3 +1,14 @@ +/* + * Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers + * All rights reserved. + * Licensed under the Apache License, Version 2.0 + * + * or the MIT license , + * at your option. All files in the project carrying such + * notice may not be copied, modified, or distributed except + * according to those terms. +*/ + use crate::actor::*; use crate::connection::*; use crate::errors::*; @@ -225,10 +236,8 @@ impl Broker { } // TODO check that not already connected to peer - //IpAddr::from_str("127.0.0.1"); - //cnx.open(url, peer_pubk, peer_privk).await?; - //let cnx = Arc::new(); - //let (priv_key, pub_key) = generate_keypair(); + // IpAddr::from_str("127.0.0.1"); + log!("CONNECTING"); let mut connection = cnx .open( diff --git a/p2p-net/src/connection.rs b/p2p-net/src/connection.rs index 1e39981..46e3fd5 100644 --- a/p2p-net/src/connection.rs +++ b/p2p-net/src/connection.rs @@ -1,4 +1,15 @@ -static NOISE_CONFIG: &'static str = "Noise_XK_25519_ChaChaPoly_BLAKE2b"; +/* + * Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers + * All rights reserved. + * Licensed under the Apache License, Version 2.0 + * + * or the MIT license , + * at your option. All files in the project carrying such + * notice may not be copied, modified, or distributed except + * according to those terms. +*/ + +//static NOISE_CONFIG: &'static str = "Noise_XK_25519_ChaChaPoly_BLAKE2b"; use std::collections::HashMap; use std::fmt; diff --git a/p2p-net/src/lib.rs b/p2p-net/src/lib.rs index 1bf94ee..44efd78 100644 --- a/p2p-net/src/lib.rs +++ b/p2p-net/src/lib.rs @@ -1,3 +1,14 @@ +/* + * Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers + * All rights reserved. + * Licensed under the Apache License, Version 2.0 + * + * or the MIT license , + * at your option. All files in the project carrying such + * notice may not be copied, modified, or distributed except + * according to those terms. +*/ + pub mod types; pub mod errors; diff --git a/p2p-net/src/utils.rs b/p2p-net/src/utils.rs index 1c0345c..e2dc2d2 100644 --- a/p2p-net/src/utils.rs +++ b/p2p-net/src/utils.rs @@ -1,3 +1,14 @@ +/* + * Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers + * All rights reserved. + * Licensed under the Apache License, Version 2.0 + * + * or the MIT license , + * at your option. All files in the project carrying such + * notice may not be copied, modified, or distributed except + * according to those terms. +*/ + use crate::log; use async_std::task; use futures::{channel::mpsc, select, Future, FutureExt, SinkExt};