some cleanup and license headers

pull/19/head
Niko 2 years ago
parent 9a4446871a
commit 39764bfa0e
  1. 87
      Cargo.lock
  2. 2
      Cargo.toml
  3. 2
      README.md
  4. 6
      ng-app-js/Cargo.toml
  5. 3
      ng-app-js/src/client_connection.rs
  6. 38
      ng-app-js/src/lib.rs
  7. 1
      ngcli/Cargo.toml
  8. 55
      ngcli/src/main.rs
  9. 180
      p2p-broker/src/auth.rs
  10. 147
      p2p-broker/src/connection_local.rs
  11. 7
      p2p-broker/src/lib.rs
  12. 2
      p2p-broker/src/server.rs
  13. 3
      p2p-broker/src/server_connection.rs
  14. 154
      p2p-broker/src/server_ws.rs
  15. 1
      p2p-client-ws/Cargo.toml
  16. 93
      p2p-client-ws/src/connection_ws.rs
  17. 2
      p2p-client-ws/src/lib.rs
  18. 86
      p2p-client-ws/src/remote_ws.rs
  19. 31
      p2p-client-ws/src/remote_ws_wasm.rs
  20. 25
      p2p-client/Cargo.toml
  21. 598
      p2p-client/src/connection_remote.rs
  22. 47
      p2p-client/src/lib.rs
  23. 53
      p2p-net/src/actor.rs
  24. 17
      p2p-net/src/actors/noise.rs
  25. 23
      p2p-net/src/actors/start.rs
  26. 17
      p2p-net/src/broker.rs
  27. 13
      p2p-net/src/connection.rs
  28. 11
      p2p-net/src/lib.rs
  29. 11
      p2p-net/src/utils.rs

87
Cargo.lock generated

@ -554,20 +554,6 @@ dependencies = [
"zeroize",
]
[[package]]
name = "curve25519-dalek"
version = "4.0.0-rc.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d4ba9852b42210c7538b75484f9daa0655e9a3ac04f693747bb0f02cf3cfe16"
dependencies = [
"cfg-if",
"fiat-crypto",
"packed_simd_2",
"platforms",
"subtle",
"zeroize",
]
[[package]]
name = "debug_print"
version = "1.0.0"
@ -621,7 +607,7 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c762bae6dcaf24c4c84667b8579785430908723d5c889f469d76a41d59cc7a9d"
dependencies = [
"curve25519-dalek 3.2.1",
"curve25519-dalek",
"ed25519",
"rand 0.7.3",
"serde",
@ -689,12 +675,6 @@ dependencies = [
"instant",
]
[[package]]
name = "fiat-crypto"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93ace6ec7cc19c8ed33a32eaa9ea692d7faea05006b5356b9e2b668ec4bc3955"
[[package]]
name = "fnv"
version = "1.0.7"
@ -1009,12 +989,6 @@ version = "0.2.140"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99227334921fae1a979cf0bfdfcc6b3e5ce376ef57e16fb6fb3ea2ed6095f80c"
[[package]]
name = "libm"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fc7aa29613bd6a620df431842069224d8bc9011086b1db4c0e0cd47fa03ec9a"
[[package]]
name = "linux-raw-sys"
version = "0.1.4"
@ -1077,7 +1051,6 @@ dependencies = [
"futures",
"getrandom 0.1.16",
"gloo-timers",
"js-sys",
"p2p-client-ws",
"p2p-net",
"p2p-repo",
@ -1085,11 +1058,9 @@ dependencies = [
"serde",
"serde_bare",
"serde_bytes",
"snow",
"wasm-bindgen",
"wasm-bindgen-futures",
"wasm-bindgen-test",
"web-sys",
"ws_stream_wasm",
]
@ -1104,7 +1075,6 @@ dependencies = [
"fastbloom-rs",
"futures",
"p2p-broker",
"p2p-client",
"p2p-client-ws",
"p2p-net",
"p2p-repo",
@ -1224,26 +1194,6 @@ dependencies = [
"tempfile",
]
[[package]]
name = "p2p-client"
version = "0.1.0"
dependencies = [
"async-channel",
"async-oneshot",
"async-std",
"async-trait",
"chacha20 0.9.0",
"debug_print",
"futures",
"p2p-net",
"p2p-repo",
"serde",
"serde_bare",
"serde_bytes",
"snow",
"xactor",
]
[[package]]
name = "p2p-client-ws"
version = "0.1.0"
@ -1257,7 +1207,6 @@ dependencies = [
"debug_print",
"futures",
"getrandom 0.2.8",
"p2p-client",
"p2p-net",
"p2p-repo",
"pharos",
@ -1336,16 +1285,6 @@ dependencies = [
"serde_bytes",
]
[[package]]
name = "packed_simd_2"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1914cd452d8fccd6f9db48147b29fd4ae05bea9dc5d9ad578509f72415de282"
dependencies = [
"cfg-if",
"libm",
]
[[package]]
name = "parking"
version = "2.0.0"
@ -1415,12 +1354,6 @@ version = "0.3.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160"
[[package]]
name = "platforms"
version = "3.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3d7ddaed09e0eb771a79ab0fd64609ba0afb0a8366421957936ad14cbd13630"
[[package]]
name = "polling"
version = "2.5.2"
@ -1807,22 +1740,6 @@ version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0"
[[package]]
name = "snow"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ccba027ba85743e09d15c03296797cad56395089b832b48b5a5217880f57733"
dependencies = [
"aes-gcm",
"blake2",
"chacha20poly1305",
"curve25519-dalek 4.0.0-rc.1",
"rand_core 0.6.4",
"rustc_version",
"sha2 0.10.6",
"subtle",
]
[[package]]
name = "socket2"
version = "0.4.9"
@ -2327,7 +2244,7 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2392b6b94a576b4e2bf3c5b2757d63f10ada8020a2e4d08ac849ebcf6ea8e077"
dependencies = [
"curve25519-dalek 3.2.1",
"curve25519-dalek",
"rand_core 0.5.1",
"zeroize",
]

@ -3,7 +3,7 @@ members = [
"p2p-repo",
"p2p-net",
"p2p-broker",
"p2p-client",
"p2p-client-ws",
"p2p-verifier",
"p2p-stores-lmdb",
"ngcli",

@ -46,7 +46,7 @@ The crates are organized as follow :
- p2p-repo : all the common types, traits and structs for the P2P repositories
- p2p-net : all the common types, traits and structs for the P2P networks
- p2p-broker : the broker code (as server and core peer)
- p2p-client : the client connecting to a broker, used by the apps and verifier
- p2p-client-ws : the client connecting to a broker, used by the apps and verifier
- p2p-stores-lmdb : lmdb backed stores for the p2p layer
- p2p-verifier : the code of the verifier
- ngcli : CLI tool to manipulate the repos

@ -26,7 +26,7 @@ debug_print = "1.0.0"
serde = { version = "1.0", features = ["derive"] }
serde_bare = "0.5.0"
serde_bytes = "0.11.7"
snow = "0.9.2"
# snow = "0.9.2"
getrandom = { version = "0.1.1", features = ["wasm-bindgen"] }
@ -35,9 +35,9 @@ getrandom = { version = "0.1.1", features = ["wasm-bindgen"] }
# features = ["js"]
[target.'cfg(target_arch = "wasm32")'.dependencies]
js-sys = "0.3.61"
# js-sys = "0.3.61"
wasm-bindgen-futures = "0.4.34"
web-sys = { version = "0.3.61", features = ["Window"] }
# web-sys = { version = "0.3.61", features = ["Window"] }
gloo-timers = "0.2.6"
[dev-dependencies]

@ -1,3 +0,0 @@
pub struct ClientConnection {}
impl ClientConnection {}

@ -1,6 +1,17 @@
/*
* Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers
* All rights reserved.
* Licensed under the Apache License, Version 2.0
* <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
* or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
* at your option. All files in the project carrying such
* notice may not be copied, modified, or distributed except
* according to those terms.
*/
use async_std::task;
#[cfg(target_arch = "wasm32")]
use js_sys::Reflect;
// #[cfg(target_arch = "wasm32")]
// use js_sys::Reflect;
#[cfg(target_arch = "wasm32")]
use p2p_client_ws::remote_ws_wasm::ConnectionWebSocket;
use p2p_net::broker::*;
@ -22,13 +33,11 @@ extern "C" {
#[cfg(target_arch = "wasm32")]
#[wasm_bindgen]
pub async fn greet(name: &str) {
log!("I say: {}", name);
let mut random_buf = [0u8; 32];
getrandom::getrandom(&mut random_buf).unwrap();
pub async fn start() {
// let mut random_buf = [0u8; 32];
// getrandom::getrandom(&mut random_buf).unwrap();
//spawn_and_log_error(testt("ws://127.0.0.1:3012"));
async fn method() -> ResultSend<()> {
async fn inner_task() -> ResultSend<()> {
let server_key = PubKey::Ed25519PubKey([
22, 140, 190, 111, 82, 151, 27, 133, 83, 121, 71, 36, 209, 53, 53, 114, 52, 254, 218,
241, 52, 155, 231, 83, 188, 189, 47, 135, 105, 213, 39, 91,
@ -88,14 +97,13 @@ pub async fn greet(name: &str) {
Ok(())
}
spawn_and_log_error(method()).await;
//spawn_and_log_error(Arc::clone(&cnx).open("ws://127.0.0.1:3012", priv_key, pub_key));
spawn_and_log_error(inner_task()).await;
}
#[cfg(not(target_arch = "wasm32"))]
#[wasm_bindgen]
pub fn greet(name: &str) {
alert(&format!("I say: {}", name));
pub fn start() {
//alert(&format!("I say: {}", name));
task::spawn(async move {});
}
@ -111,10 +119,10 @@ pub fn change(name: &str) -> JsValue {
mod test {
use wasm_bindgen_test::*;
wasm_bindgen_test_configure!(run_in_browser);
use crate::greet;
use crate::start;
#[wasm_bindgen_test]
pub async fn test_greet() {
greet("test").await;
pub async fn test_connection() {
start().await;
}
}

@ -12,7 +12,6 @@ debug_print = "1.0.0"
p2p-repo = { path = "../p2p-repo" }
p2p-net = { path = "../p2p-net" }
p2p-client-ws = { path = "../p2p-client-ws" }
p2p-client = { path = "../p2p-client" }
p2p-broker = { path = "../p2p-broker" }
p2p-stores-lmdb = { path = "../p2p-stores-lmdb" }
async-std = { version = "1.12.0", features = ["attributes"] }

@ -13,23 +13,22 @@ use debug_print::*;
use ed25519_dalek::*;
use fastbloom_rs::{BloomFilter as Filter, FilterBuilder, Membership};
use futures::{future, pin_mut, stream, SinkExt, StreamExt};
use p2p_broker::broker_store::config::ConfigMode;
use p2p_repo::object::Object;
use p2p_repo::store::{store_max_value_size, store_valid_value_size, HashMapRepoStore, RepoStore};
use p2p_broker::broker_store::config::ConfigMode;
use p2p_stores_lmdb::broker_store::LmdbBrokerStore;
use p2p_stores_lmdb::repo_store::LmdbRepoStore;
use rand::rngs::OsRng;
use std::collections::HashMap;
use p2p_repo::types::*;
use p2p_repo::utils::{generate_keypair, now_timestamp};
use p2p_broker::server_ws::*;
use p2p_broker::server::*;
use p2p_broker::server_ws::*;
use p2p_net::broker_connection::*;
use p2p_net::errors::*;
use p2p_net::types::*;
use p2p_net::broker_connection::*;
use p2p_client::connection_remote::*;
use p2p_client_ws::connection_ws::*;
use p2p_repo::types::*;
use p2p_repo::utils::{generate_keypair, now_timestamp};
use p2p_broker::connection_local::*;
fn block_size() -> usize {
@ -450,15 +449,22 @@ async fn test_sync(cnx: &mut impl BrokerConnection, user_pub_key: PubKey, userpr
// now the client can verify the DAG and each commit. Then update its list of heads.
}
async fn test(cnx: &mut impl BrokerConnection, pub_key: PubKey, priv_key: PrivKey) -> Result<(), ProtocolError>{
cnx.add_user(PubKey::Ed25519PubKey([1; 32]), priv_key).await?;
async fn test(
cnx: &mut impl BrokerConnection,
pub_key: PubKey,
priv_key: PrivKey,
) -> Result<(), ProtocolError> {
cnx.add_user(PubKey::Ed25519PubKey([1; 32]), priv_key)
.await?;
cnx.add_user(pub_key, priv_key).await?;
//.expect("add_user 2 (myself) failed");
assert_eq!(
cnx.add_user(PubKey::Ed25519PubKey([1; 32]), priv_key).await.err().unwrap(),
cnx.add_user(PubKey::Ed25519PubKey([1; 32]), priv_key)
.await
.err()
.unwrap(),
ProtocolError::UserAlreadyExists
);
@ -467,9 +473,7 @@ async fn test(cnx: &mut impl BrokerConnection, pub_key: PubKey, priv_key: PrivKe
secret: SymKey::ChaCha20Key([0; 32]),
peers: vec![],
});
let mut public_overlay_cnx = cnx
.overlay_connect(&repo, true)
.await?;
let mut public_overlay_cnx = cnx.overlay_connect(&repo, true).await?;
debug_println!("put_block");
@ -511,18 +515,14 @@ async fn test(cnx: &mut impl BrokerConnection, pub_key: PubKey, priv_key: PrivKe
debug_println!("GOT BLOCK {}", b.id());
}
let mut my_object_stream = public_overlay_cnx
.get_block(object_id, true, None)
.await?;
let mut my_object_stream = public_overlay_cnx.get_block(object_id, true, None).await?;
//.expect("get_block for object failed");
while let Some(b) = my_object_stream.next().await {
debug_println!("GOT BLOCK {}", b.id());
}
let object = public_overlay_cnx
.get_object(object_id, None)
.await?;
let object = public_overlay_cnx.get_object(object_id, None).await?;
//.expect("get_object failed");
debug_println!("GOT OBJECT with ID {}", object.id());
@ -534,9 +534,7 @@ async fn test(cnx: &mut impl BrokerConnection, pub_key: PubKey, priv_key: PrivKe
// debug_println!("COPIED OBJECT to OBJECT ID {}", object_id);
public_overlay_cnx
.delete_object(object_id)
.await?;
public_overlay_cnx.delete_object(object_id).await?;
//.expect("delete_object failed");
let res = public_overlay_cnx
@ -586,16 +584,12 @@ async fn test_remote_connection(url: &str) {
Ok(mut cnx) => {
if let Err(e) = test(&mut cnx, pub_key, priv_key).await {
debug_println!("error: {:?}", e)
}
else {
} else {
cnx.close().await;
} }
Err(e) => {
}
}
Err(e) => {}
}
}
#[xactor::main]
@ -624,7 +618,6 @@ mod test {
#[async_std::test]
pub async fn test_remote_cnx() -> Result<(), Box<dyn std::error::Error>> {
let thr = task::spawn(run_server_accept_one("127.0.0.1:3012"));
std::thread::sleep(std::time::Duration::from_secs(2));

@ -1,180 +0,0 @@
// Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers
// All rights reserved.
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
// at your option. All files in the project carrying such
// notice may not be copied, modified, or distributed except
// according to those terms.
use std::pin::Pin;
use debug_print::*;
use futures::future::BoxFuture;
use futures::future::OptionFuture;
use futures::FutureExt;
use p2p_net::actors::*;
use p2p_net::errors::*;
use p2p_net::types::*;
use p2p_repo::types::*;
use p2p_repo::utils::*;
use rust_fsm::*;
// state_machine! {
// derive(Debug)
// AuthProtocolClient(Ready)
// Ready(ClientHelloSent) => ClientHelloSent,
// ClientHelloSent(ServerHelloReceived) => ServerHelloReceived,
// ServerHelloReceived(ClientAuthSent) => ClientAuthSent,
// ClientAuthSent(AuthResultReceived) => AuthResult,
// AuthResult => {
// Ok => BrokerProtocol,
// Error => Closed,
// },
// }
// state_machine! {
// derive(Debug)
// AuthProtocolServer(Ready)
// Ready(ClientHelloReceived) => ClientHelloReceived,
// ClientHelloReceived(ServerHelloSent) => ServerHelloSent,
// ServerHelloSent(ClientAuthReceived) => ClientAuthReceived,
// ClientAuthReceived => {
// Ok => AuthResultOk,
// Error => AuthResultError,
// },
// AuthResultOk(AuthResultSent) => BrokerProtocol,
// AuthResultError(AuthResultSent) => Closed,
// }
#[derive(Debug)]
pub struct AuthProtocolHandler {
//machine: StateMachine<AuthProtocolServer>,
nonce: Option<Vec<u8>>,
user: Option<PubKey>,
}
impl AuthProtocolHandler {
pub fn new() -> AuthProtocolHandler {
AuthProtocolHandler {
//machine: StateMachine::new(),
nonce: None,
user: None,
}
}
pub fn get_user(&self) -> Option<PubKey> {
self.user
}
pub fn handle_init(&mut self, client_hello: ClientHello) -> Result<Vec<u8>, ProtocolError> {
// let _ = self
// .machine
// .consume(&AuthProtocolServerInput::ClientHelloReceived)
// .map_err(|_e| ProtocolError::InvalidState)?;
let mut random_buf = [0u8; 32];
getrandom::getrandom(&mut random_buf).unwrap();
let nonce = random_buf.to_vec();
let reply = ServerHello::V0(ServerHelloV0 {
nonce: nonce.clone(),
});
self.nonce = Some(nonce);
// let _ = self
// .machine
// .consume(&AuthProtocolServerInput::ServerHelloSent)
// .map_err(|_e| ProtocolError::InvalidState)?;
//debug_println!("sending nonce to client: {:?}", self.nonce);
Ok(serde_bare::to_vec(&reply).unwrap())
}
pub fn handle_incoming(
&mut self,
frame: Vec<u8>,
) -> (
Result<Vec<u8>, ProtocolError>,
Pin<Box<OptionFuture<BoxFuture<'static, u16>>>>,
) {
fn prepare_reply(res: Result<Vec<u8>, ProtocolError>) -> AuthResult {
let (result, metadata) = match res {
Ok(m) => (0, m),
Err(e) => (e.into(), vec![]),
};
AuthResult::V0(AuthResultV0 { result, metadata })
}
fn process_state(
handler: &mut AuthProtocolHandler,
frame: Vec<u8>,
) -> Result<Vec<u8>, ProtocolError> {
// match handler.machine.state() {
// &AuthProtocolServerState::ServerHelloSent => {
let message = serde_bare::from_slice::<ClientAuth>(&frame)?;
// let _ = handler
// .machine
// .consume(&AuthProtocolServerInput::ClientAuthReceived)
// .map_err(|_e| ProtocolError::InvalidState)?;
// verifying client auth
debug_println!("verifying client auth");
let _ = verify(
&serde_bare::to_vec(&message.content_v0()).unwrap(),
message.sig(),
message.user(),
)
.map_err(|_e| ProtocolError::AccessDenied)?;
// debug_println!(
// "matching nonce : {:?} {:?}",
// message.nonce(),
// handler.nonce.as_ref().unwrap()
// );
if message.nonce() != handler.nonce.as_ref().unwrap() {
// let _ = handler
// .machine
// .consume(&AuthProtocolServerInput::Error)
// .map_err(|_e| ProtocolError::InvalidState);
return Err(ProtocolError::AccessDenied);
}
// TODO check that the device has been registered for this user. if not, return AccessDenied
// all is good, we advance the FSM and send back response
// let _ = handler
// .machine
// .consume(&AuthProtocolServerInput::Ok)
// .map_err(|_e| ProtocolError::InvalidState)?;
handler.user = Some(message.user());
Ok(vec![]) // without any metadata
//}
//_ => Err(ProtocolError::InvalidState),
//}
}
let res = process_state(self, frame);
let is_err = res.as_ref().err().cloned();
let reply = prepare_reply(res);
let reply_ser: Result<Vec<u8>, ProtocolError> = Ok(serde_bare::to_vec(&reply).unwrap());
if is_err.is_some() {
(
reply_ser,
Box::pin(OptionFuture::from(Some(
async move { reply.result() }.boxed(),
))),
)
} else {
(reply_ser, Box::pin(OptionFuture::from(None)))
}
}
}

@ -1,147 +0,0 @@
// Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers
// All rights reserved.
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
// at your option. All files in the project carrying such
// notice may not be copied, modified, or distributed except
// according to those terms.
//! Local Connection to a Broker
use futures::{
ready,
stream::Stream,
task::{Context, Poll},
Future,
select, FutureExt,
};
use futures::channel::mpsc;
use std::pin::Pin;
use std::{collections::HashSet, fmt::Debug};
use crate::server::BrokerServer;
use debug_print::*;
use futures::{pin_mut, stream, Sink, SinkExt, StreamExt};
use p2p_repo::object::*;
use p2p_repo::store::*;
use p2p_repo::types::*;
use p2p_repo::utils::*;
use p2p_net::errors::*;
use p2p_net::types::*;
use p2p_net::broker_connection::*;
use std::collections::HashMap;
pub struct BrokerConnectionLocal<'a> {
broker: &'a mut BrokerServer,
user: PubKey,
}
#[async_trait::async_trait]
impl<'a> BrokerConnection for BrokerConnectionLocal<'a> {
type OC = BrokerConnectionLocal<'a>;
type BlockStream = async_channel::Receiver<Block>;
async fn close(&mut self) {}
async fn add_user(
&mut self,
user_id: PubKey,
admin_user_pk: PrivKey,
) -> Result<(), ProtocolError> {
let op_content = AddUserContentV0 { user: user_id };
let sig = sign(admin_user_pk, self.user, &serde_bare::to_vec(&op_content)?)?;
self.broker.add_user(self.user, user_id, sig)
}
async fn process_overlay_request(
&mut self,
overlay: OverlayId,
request: BrokerOverlayRequestContentV0,
) -> Result<(), ProtocolError> {
match request {
BrokerOverlayRequestContentV0::OverlayConnect(_) => {
self.broker.connect_overlay(self.user, overlay)
}
BrokerOverlayRequestContentV0::OverlayJoin(j) => {
self.broker
.join_overlay(self.user, overlay, j.repo_pubkey(), j.secret(), j.peers())
}
BrokerOverlayRequestContentV0::ObjectPin(op) => {
self.broker.pin_object(self.user, overlay, op.id())
}
BrokerOverlayRequestContentV0::ObjectUnpin(op) => {
self.broker.unpin_object(self.user, overlay, op.id())
}
BrokerOverlayRequestContentV0::ObjectDel(op) => {
self.broker.del_object(self.user, overlay, op.id())
}
BrokerOverlayRequestContentV0::BlockPut(b) => {
self.broker.put_block(self.user, overlay, b.block())
}
_ => Err(ProtocolError::InvalidState),
}
}
async fn process_overlay_request_objectid_response(
&mut self,
overlay: OverlayId,
request: BrokerOverlayRequestContentV0,
) -> Result<ObjectId, ProtocolError> {
match request {
BrokerOverlayRequestContentV0::ObjectCopy(oc) => {
self.broker
.copy_object(self.user, overlay, oc.id(), oc.expiry())
}
_ => Err(ProtocolError::InvalidState),
}
}
async fn process_overlay_request_stream_response(
&mut self,
overlay: OverlayId,
request: BrokerOverlayRequestContentV0,
) -> Result<Pin<Box<Self::BlockStream>>, ProtocolError> {
match request {
BrokerOverlayRequestContentV0::BlockGet(b) => self
.broker
.get_block(self.user, overlay, b.id(), b.include_children(), b.topic())
.map(|r| Box::pin(r)),
BrokerOverlayRequestContentV0::BranchSyncReq(b) => self
.broker
.sync_branch(
self.user,
&overlay,
b.heads(),
b.known_heads(),
b.known_commits(),
)
.map(|r| Box::pin(r)),
_ => Err(ProtocolError::InvalidState),
}
}
async fn del_user(&mut self, user_id: PubKey, admin_user_pk: PrivKey) {}
async fn add_client(&mut self, user_id: PubKey, admin_user_pk: PrivKey) {}
async fn del_client(&mut self, user_id: PubKey, admin_user_pk: PrivKey) {}
async fn overlay_connect(
&mut self,
repo_link: &RepoLink,
public: bool,
) -> Result<OverlayConnectionClient<BrokerConnectionLocal<'a>>, ProtocolError> {
let overlay = self.process_overlay_connect(repo_link, public).await?;
Ok(OverlayConnectionClient::create(self, overlay, repo_link.clone()))
}
}
impl<'a> BrokerConnectionLocal<'a> {
pub fn new(broker: &'a mut BrokerServer, user: PubKey) -> BrokerConnectionLocal<'a> {
BrokerConnectionLocal { broker, user }
}
}

@ -1,10 +1,3 @@
pub mod broker_store;
pub mod connection_local;
pub mod server;
pub mod server_ws;
pub mod auth;

@ -16,14 +16,12 @@ use std::pin::Pin;
use std::sync::Arc;
use std::sync::RwLock;
use crate::auth::*;
use crate::broker_store::account::Account;
use crate::broker_store::config::Config;
use crate::broker_store::config::ConfigMode;
use crate::broker_store::overlay::Overlay;
use crate::broker_store::peer::Peer;
use crate::broker_store::repostoreinfo::RepoStoreInfo;
use crate::connection_local::BrokerConnectionLocal;
use async_std::task;
use debug_print::*;
use futures::future::BoxFuture;

@ -1,3 +0,0 @@
pub struct ServerConnection {}
impl ServerConnection {}

@ -12,7 +12,6 @@
//! WebSocket implementation of the Broker
use crate::broker_store::config::ConfigMode;
use crate::server::*;
use async_std::net::{TcpListener, TcpStream};
use async_std::sync::Mutex;
use async_std::task;
@ -35,116 +34,48 @@ use std::sync::Arc;
use std::{thread, time};
use tempfile::Builder;
async fn connection_loop(tcp: TcpStream, mut handler: ProtocolHandler) -> std::io::Result<()> {
let addr = tcp.peer_addr().unwrap();
handler.register(addr);
pub async fn accept(tcp: TcpStream, peer_priv_key: Sensitive<[u8; 32]>, peer_pub_key: PubKey) {
let sock_addr = tcp.peer_addr().unwrap();
let ip = sock_addr.ip();
let mut ws = accept_async(tcp).await.unwrap();
let (mut tx, mut rx) = ws.split();
let mut tx_mutex = Arc::new(Mutex::new(tx));
// setup the async frames task
let receiver = handler.async_frames_receiver();
let ws_in_task = Arc::clone(&tx_mutex);
task::spawn(async move {
while let Ok(frame) = receiver.recv().await {
let mut sink = ws_in_task.lock().await;
if sink.send(Message::binary(frame)).await.is_err() {
break;
}
}
debug_println!("end of async frames loop");
let mut sink = ws_in_task.lock().await;
let _ = sink.send(Message::Close(None)).await;
let _ = sink.close().await;
});
while let Some(msg) = rx.next().await {
//debug_println!("RCV: {:?}", msg);
let msg = match msg {
Err(e) => {
debug_println!("Error on server stream: {:?}", e);
// Errors returned directly through the AsyncRead/Write API are fatal, generally an error on the underlying
// transport. closing connection
break;
}
Ok(m) => m,
};
//TODO implement PING messages
if msg.is_close() {
debug_println!("CLOSE from CLIENT");
if let Message::Close(Some(cf)) = msg {
debug_println!("CLOSE FRAME {:?}", cf);
} else if let Message::Close(None) = msg {
debug_println!("without CLOSE FRAME");
}
break;
} else if msg.is_binary() {
//debug_println!("server received binary: {:?}", msg);
let cws = ConnectionWebSocket {};
let base = cws.accept(peer_priv_key, peer_pub_key, ws).await.unwrap();
let replies = handler.handle_incoming(msg.into_data()).await;
//TODO FIXME get remote_peer_id from ConnectionBase (once it is available)
let (priv_key, pub_key) = generate_keypair();
let remote_peer_id = pub_key;
match replies.0 {
Err(e) => {
debug_println!("Protocol Error: {:?}", e);
// dealing with ProtocolErrors (closing the connection)
break;
}
Ok(r) => {
if tx_mutex
.lock()
.await
.send(Message::binary(r))
let res = BROKER
.write()
.await
.is_err()
{
//dealing with sending errors (closing the connection)
break;
}
}
}
match replies.1.await {
Some(errcode) => {
if errcode > 0 {
debug_println!("Close due to error code : {:?}", errcode);
//closing connection
break;
}
}
None => {}
}
}
}
handler.deregister();
let mut sink = tx_mutex.lock().await;
let _ = sink.send(Message::Close(None)).await;
let _ = sink.close().await;
debug_println!("end of sync read+write loop");
Ok(())
.accept(base, IP::try_from(&ip).unwrap(), None, remote_peer_id)
.await;
}
pub async fn run_server_accept_one(addrs: &str) -> std::io::Result<()> {
let root = tempfile::Builder::new()
.prefix("node-daemon")
.tempdir()
.unwrap();
pub async fn run_server_accept_one(
addrs: &str,
peer_priv_key: Sensitive<[u8; 32]>,
peer_pub_key: PubKey,
) -> std::io::Result<()> {
let root = tempfile::Builder::new().prefix("ngd").tempdir().unwrap();
let master_key: [u8; 32] = [0; 32];
std::fs::create_dir_all(root.path()).unwrap();
println!("{}", root.path().to_str().unwrap());
let store = LmdbBrokerStore::open(root.path(), master_key);
let server: BrokerServer =
BrokerServer::new(store, ConfigMode::Local).expect("starting broker");
// TODO: remove this part
// let server: BrokerServer =
// BrokerServer::new(store, ConfigMode::Local).expect("starting broker");
// let server_arc = Arc::new(server);
let socket = TcpListener::bind(addrs).await?;
debug_println!("Listening on {}", addrs);
let mut connections = socket.incoming();
let server_arc = Arc::new(server);
let tcp = connections.next().await.unwrap()?;
let proto_handler = Arc::clone(&server_arc).protocol_handler();
let _handle = task::spawn(connection_loop(tcp, proto_handler));
accept(tcp, peer_priv_key, peer_pub_key).await;
Ok(())
}
@ -154,46 +85,27 @@ pub async fn run_server(
peer_priv_key: Sensitive<[u8; 32]>,
peer_pub_key: PubKey,
) -> std::io::Result<()> {
let root = tempfile::Builder::new()
.prefix("node-daemon")
.tempdir()
.unwrap();
let root = tempfile::Builder::new().prefix("ngd").tempdir().unwrap();
let master_key: [u8; 32] = [0; 32];
std::fs::create_dir_all(root.path()).unwrap();
println!("{}", root.path().to_str().unwrap());
let store = LmdbBrokerStore::open(root.path(), master_key);
let server: BrokerServer =
BrokerServer::new(store, ConfigMode::Local).expect("starting broker");
// TODO: remove this part
// let server: BrokerServer =
// BrokerServer::new(store, ConfigMode::Local).expect("starting broker");
// let server_arc = Arc::new(server);
let socket = TcpListener::bind(addrs).await?;
debug_println!("Listening on {}", addrs);
let mut connections = socket.incoming();
let server_arc = Arc::new(server);
while let Some(tcp) = connections.next().await {
let tcp = tcp.unwrap();
let sock_addr = tcp.peer_addr().unwrap();
let ip = sock_addr.ip();
let mut ws = accept_async(tcp).await.unwrap();
let cws = ConnectionWebSocket {};
let base = cws
.accept(
while let Some(tcp) = connections.next().await {
accept(
tcp.unwrap(),
Sensitive::<[u8; 32]>::from_slice(peer_priv_key.deref()),
peer_pub_key,
ws,
)
.await
.unwrap();
//TODO FIXME get remote_peer_id from ConnectionBase (once it is available)
let (priv_key, pub_key) = generate_keypair();
let remote_peer_id = pub_key;
let res = BROKER
.write()
.await
.accept(base, IP::try_from(&ip).unwrap(), None, remote_peer_id)
.await;
}
Ok(())

@ -35,4 +35,3 @@ features = ["js"]
getrandom = "0.2.7"
xactor = "0.7.11"
async-tungstenite = { version = "0.17.2", features = ["async-std-runtime"] }
p2p-client = { path = "../p2p-client" }

@ -1,93 +0,0 @@
/*
* Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers
* All rights reserved.
* Licensed under the Apache License, Version 2.0
* <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
* or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
* at your option. All files in the project carrying such
* notice may not be copied, modified, or distributed except
* according to those terms.
*/
//! WebSocket Remote Connection to a Broker
use debug_print::*;
use futures::{future, pin_mut, stream, SinkExt, StreamExt};
use p2p_client::connection_remote::*;
use p2p_net::broker_connection::*;
use p2p_net::errors::*;
use p2p_net::types::*;
use p2p_repo::types::*;
use p2p_repo::utils::{generate_keypair, now_timestamp};
use async_tungstenite::async_std::connect_async;
use async_tungstenite::client_async;
use async_tungstenite::tungstenite::{Error, Message};
pub struct BrokerConnectionWebSocket {}
impl BrokerConnectionWebSocket {
pub async fn open(
url: &str,
priv_key: PrivKey,
pub_key: PubKey,
) -> Result<impl BrokerConnection, ProtocolError> {
let res = connect_async(url).await;
match (res) {
Ok((ws, _)) => {
debug_println!("WebSocket handshake completed");
let (write, read) = ws.split();
let mut frames_stream_read = read.map(|msg_res| match msg_res {
Err(e) => {
debug_println!("ERROR {:?}", e);
vec![]
}
Ok(message) => {
if message.is_close() {
debug_println!("CLOSE FROM SERVER");
vec![]
} else {
message.into_data()
}
}
});
async fn transform(message: Vec<u8>) -> Result<Message, Error> {
if message.len() == 0 {
debug_println!("sending CLOSE message to SERVER");
Ok(Message::Close(None))
} else {
Ok(Message::binary(message))
}
}
let frames_stream_write = write
.with(|message| transform(message))
.sink_map_err(|e| ProtocolError::WriteError);
let master_key: [u8; 32] = [0; 32];
let mut cnx_res = ConnectionRemote::open_broker_connection(
frames_stream_write,
frames_stream_read,
pub_key,
priv_key,
PubKey::Ed25519PubKey([1; 32]),
)
.await;
match cnx_res {
Ok(mut cnx) => Ok(cnx),
Err(e) => {
debug_println!("cannot connect {:?}", e);
Err(e)
}
}
}
Err(e) => {
debug_println!("Cannot connect: {:?}", e);
Err(ProtocolError::ConnectionError)
}
}
}
}

@ -44,8 +44,6 @@ macro_rules! after {
};
}
//pub mod connection_ws;
#[cfg(not(target_arch = "wasm32"))]
pub mod remote_ws;

@ -61,65 +61,16 @@ impl IConnect for ConnectionWebSocket {
Err(NetError::ConnectionError)
}
Ok((mut websocket, _)) => {
//let ws = Arc::new(Mutex::new(Box::pin(websocket)));
// let (write, read) = ws.split();
// let mut stream_read = read.map(|msg_res| match msg_res {
// Err(e) => {
// debug_println!("READ ERROR {:?}", e);
// ConnectionCommand::Error(NetError::IoError)
// }
// Ok(message) => {
// if message.is_close() {
// debug_println!("CLOSE FROM SERVER");
// ConnectionCommand::Close
// } else {
// ConnectionCommand::Msg(
// serde_bare::from_slice::<ProtocolMessage>(&message.into_data())
// .unwrap(),
// )
// }
// }
// });
// async fn write_transform(cmd: ConnectionCommand) -> Result<Message, Error> {
// match cmd {
// ConnectionCommand::Error(_) => Err(Error::AlreadyClosed), //FIXME
// ConnectionCommand::ProtocolError(_) => Err(Error::AlreadyClosed), //FIXME
// ConnectionCommand::Close => {
// // todo close cnx. }
// Err(Error::AlreadyClosed)
// }
// ConnectionCommand::Msg(msg) => Ok(Message::binary(
// serde_bare::to_vec(&msg)
// .map_err(|_| Error::AlreadyClosed) //FIXME
// .unwrap(),
// )),
// }
// }
// let stream_write = write
// .with(|message| write_transform(message))
// .sink_map_err(|e| NetError::IoError);
// ws.close(Some(CloseFrame {
// code: CloseCode::Library(4000),
// reason: std::borrow::Cow::Borrowed(""),
// }))
// .await;
cnx.start_read_loop(peer_privk, Some(remote_peer));
let s = cnx.take_sender();
let r = cnx.take_receiver();
let mut shutdown = cnx.set_shutdown();
//let ws_in_task = Arc::clone(&ws);
let join = task::spawn(async move {
debug_println!("START of WS loop");
//let w = ws_in_task.lock().await;
let res = ws_loop(websocket, s, r).await;
// .close(Some(CloseFrame {
// code: CloseCode::Library(4000),
// reason: std::borrow::Cow::Borrowed(""),
// }))
// .await;
if res.is_err() {
let _ = shutdown.send(res.err().unwrap()).await;
}
@ -128,37 +79,7 @@ impl IConnect for ConnectionWebSocket {
cnx.start(config).await;
//spawn_and_log_error(ws_loop(ws, cnx.take_sender(), cnx.take_receiver()));
//
//cnx.close().await;
//// let res = cnx.join_shutdown().await;
//// log!("JOIN SHUTDOWN {:?}", res);
// cnx.send(ConnectionCommand::Close).await;
// cnx.send(ConnectionCommand::Msg(ProtocolMessage::Start(
// StartProtocol::Auth(ClientHello::V0()),
// )))
// .await;
//cnx.close().await;
// let _ = cnx.inject(last_command).await;
// let _ = cnx.close_streams().await;
// Note that since WsMeta::connect resolves to an opened connection, we don't see
// any Open events here.
//
//assert!(evts.next().await.unwrap_throw().is_closing());
// TODO wait for close
//log!("WS closed {:?}", last_event.clone());
Ok(cnx)
//Ok(())
}
}
}
@ -326,7 +247,6 @@ async fn ws_loop(
return Err(e);
}
}
//log!("END OF LOOP");
Ok(())
}

@ -42,7 +42,6 @@ impl IConnect for ConnectionWebSocket {
remote_peer: DirectPeerId,
config: StartConfig,
) -> Result<ConnectionBase, NetError> {
//pub async fn testt(url: &str) -> ResultSend<()> {
let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS);
let url = format!("ws://{}:{}", ip, WS_PORT);
@ -52,9 +51,6 @@ impl IConnect for ConnectionWebSocket {
NetError::ConnectionError
})?;
//let (mut sender_tx, sender_rx) = mpsc::unbounded();
//let (mut receiver_tx, receiver_rx) = mpsc::unbounded();
cnx.start_read_loop(peer_privk, Some(remote_peer));
let mut shutdown = cnx.set_shutdown();
@ -68,32 +64,6 @@ impl IConnect for ConnectionWebSocket {
cnx.start(config).await;
//spawn_and_log_error(read_loop(receiver_rx, sender_tx.clone()));
//cnx.close().await;
// spawn_and_log_error(async move {
// TimeoutFuture::new(10_000).await;
// cnx.close().await;
// Ok(())
// // // Do something here after the one second timeout is up!
// });
// cnx.send(ConnectionCommand::Close).await;
// cnx.send(ConnectionCommand::Msg(ProtocolMessage::Start(
// StartProtocol::Auth(ClientHello::V0()),
// )))
// .await;
//log!("waiting...");
//let res = join.next().await;
//log!("finished...");
//log!("JOIN SHUTDOWN {:?}", res);
// Note that since WsMeta::connect resolves to an opened connection, we don't see
// any Open events here.
//
//assert!(events.next().await.unwrap_throw().is_closing());
//Ok(cnx)
Ok(cnx)
}
}
@ -184,7 +154,6 @@ async fn ws_loop(
}
}
log!("waiting for closing event");
let last_event = events.next().await;
log!("WS closed {:?}", last_event.clone());
let last_command = match last_event {

@ -1,25 +0,0 @@
[package]
name = "p2p-client"
version = "0.1.0"
edition = "2021"
license = "MIT/Apache-2.0"
authors = ["Niko PLP <niko@nextgraph.org>"]
description = "P2P Client module of NextGraph"
repository = "https://git.nextgraph.org/NextGraph/nextgraph-rs"
[dependencies]
debug_print = "1.0.0"
p2p-repo = { path = "../p2p-repo" }
p2p-net = { path = "../p2p-net" }
chacha20 = "0.9.0"
serde = { version = "1.0", features = ["derive"] }
serde_bare = "0.5.0"
serde_bytes = "0.11.7"
xactor = "0.7.11"
async-trait = "0.1.64"
async-std = { version = "1.12.0", features = ["attributes"] }
futures = "0.3.24"
async-channel = "1.7.1"
async-oneshot = "0.5.0"
snow = "0.9.2"

@ -1,598 +0,0 @@
/*
* Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers
* All rights reserved.
* Licensed under the Apache License, Version 2.0
* <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
* or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
* at your option. All files in the project carrying such
* notice may not be copied, modified, or distributed except
* according to those terms.
*/
//! Remote Connection to a Broker
use async_std::task;
use async_std::sync::Mutex;
use futures::{
ready,
stream::Stream,
task::{Context, Poll},
Future,
select, FutureExt,
};
use futures::channel::mpsc;
use std::pin::Pin;
use std::{collections::HashSet, fmt::Debug};
use async_oneshot::oneshot;
use debug_print::*;
use futures::{pin_mut, stream, Sink, SinkExt, StreamExt};
use p2p_repo::object::*;
use p2p_repo::store::*;
use p2p_repo::types::*;
use p2p_repo::utils::*;
use p2p_net::errors::*;
use p2p_net::types::*;
use p2p_net::broker_connection::*;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use xactor::{message, spawn, Actor, Addr, Handler, WeakAddr};
#[message]
struct BrokerMessageXActor(BrokerMessage);
struct BrokerMessageActor {
r: Option<async_oneshot::Receiver<BrokerMessage>>,
s: async_oneshot::Sender<BrokerMessage>,
}
impl Actor for BrokerMessageActor {}
impl BrokerMessageActor {
fn new() -> BrokerMessageActor {
let (s, r) = oneshot::<BrokerMessage>();
BrokerMessageActor { r: Some(r), s }
}
fn resolve(&mut self, msg: BrokerMessage) {
let _ = self.s.send(msg);
}
fn receiver(&mut self) -> async_oneshot::Receiver<BrokerMessage> {
self.r.take().unwrap()
}
}
struct BrokerMessageStreamActor {
r: Option<async_channel::Receiver<Block>>,
s: async_channel::Sender<Block>,
error_r: Option<async_oneshot::Receiver<Option<ProtocolError>>>,
error_s: Option<async_oneshot::Sender<Option<ProtocolError>>>,
}
impl Actor for BrokerMessageStreamActor {}
impl BrokerMessageStreamActor {
fn new() -> BrokerMessageStreamActor {
let (s, r) = async_channel::unbounded::<Block>();
let (error_s, error_r) = oneshot::<Option<ProtocolError>>();
BrokerMessageStreamActor {
r: Some(r),
s,
error_r: Some(error_r),
error_s: Some(error_s),
}
}
async fn partial(&mut self, block: Block) -> Result<(), ProtocolError> {
//debug_println!("GOT PARTIAL {:?}", block.id());
self.s
.send(block)
.await
.map_err(|e| ProtocolError::WriteError)
}
fn receiver(&mut self) -> async_channel::Receiver<Block> {
self.r.take().unwrap()
}
fn error_receiver(&mut self) -> async_oneshot::Receiver<Option<ProtocolError>> {
self.error_r.take().unwrap()
}
fn send_error(&mut self, err: Option<ProtocolError>) {
if self.error_s.is_some() {
let _ = self.error_s.take().unwrap().send(err);
self.error_s = None;
}
}
fn close(&mut self) {
self.s.close();
}
}
#[async_trait::async_trait]
impl Handler<BrokerMessageXActor> for BrokerMessageActor {
async fn handle(&mut self, ctx: &mut xactor::Context<Self>, msg: BrokerMessageXActor) {
//println!("handling {:?}", msg.0);
self.resolve(msg.0);
ctx.stop(None);
}
}
#[async_trait::async_trait]
impl Handler<BrokerMessageXActor> for BrokerMessageStreamActor {
async fn handle(&mut self, ctx: &mut xactor::Context<Self>, msg: BrokerMessageXActor) {
//println!("handling {:?}", msg.0);
let res: Result<Option<Block>, ProtocolError> = msg.0.into();
match res {
Err(e) => {
self.send_error(Some(e));
ctx.stop(None);
self.close();
}
Ok(Some(b)) => {
self.send_error(None);
// it must be a partial content
let res = self.partial(b).await;
if let Err(e) = res {
ctx.stop(None);
self.close();
}
}
Ok(None) => {
self.send_error(None);
ctx.stop(None);
self.close();
}
}
}
}
pub struct ConnectionRemote {}
impl ConnectionRemote {
pub async fn ext_request<
B: Stream<Item = Vec<u8>> + StreamExt + Send + Sync,
A: Sink<Vec<u8>, Error = ProtocolError> + Send,
>(
w: A,
r: B,
request: ExtRequest,
) -> Result<ExtResponse, ProtocolError> {
unimplemented!();
}
async fn close<S>(w: S, err: ProtocolError) -> ProtocolError
where
S: Sink<Vec<u8>, Error = ProtocolError>,
{
let mut writer = Box::pin(w);
let _ = writer.send(vec![]);
let _ = writer.close().await;
err
}
pub async fn open_broker_connection<
B: Stream<Item = Vec<u8>> + StreamExt + Send + Sync + 'static,
A: Sink<Vec<u8>, Error = ProtocolError> + Send + 'static,
>(
w: A,
r: B,
user: PubKey,
user_pk: PrivKey,
client: PubKey,
) -> Result<impl BrokerConnection, ProtocolError> {
let mut writer = Box::pin(w);
writer
.send(serde_bare::to_vec(&StartProtocol::Auth(ClientHello::V0()))?)
.await
.map_err(|_e| ProtocolError::WriteError)?;
let mut reader = Box::pin(r);
let answer = reader.next().await;
if answer.is_none() {
return Err(Self::close(writer, ProtocolError::InvalidState).await);
}
let server_hello = serde_bare::from_slice::<ServerHello>(&answer.unwrap())?;
//debug_println!("received nonce from server: {:?}", server_hello.nonce());
let content = ClientAuthContentV0 {
user,
client,
nonce: server_hello.nonce().clone(),
};
let sig = sign(user_pk, user, &serde_bare::to_vec(&content)?)
.map_err(|_e| ProtocolError::SignatureError)?;
let auth_ser = serde_bare::to_vec(&ClientAuth::V0(ClientAuthV0 { content, sig }))?;
//debug_println!("AUTH SENT {:?}", auth_ser);
writer
.send(auth_ser)
.await
.map_err(|_e| ProtocolError::WriteError)?;
let answer = reader.next().await;
if answer.is_none() {
//return Err(ProtocolError::InvalidState);
return Err(Self::close(writer, ProtocolError::InvalidState).await);
}
let auth_result = serde_bare::from_slice::<AuthResult>(&answer.unwrap())?;
match auth_result.result() {
0 => {
async fn transform(message: BrokerMessage) -> Result<Vec<u8>, ProtocolError> {
if message.is_close() {
Ok(vec![])
} else {
Ok(serde_bare::to_vec(&message)?)
}
}
let messages_stream_write = writer.with(|message| transform(message));
let mut messages_stream_read = reader.map(|message| {
if message.len() == 0 {
BrokerMessage::Close
} else {
match serde_bare::from_slice::<BrokerMessage>(&message) {
Err(e) => BrokerMessage::Close,
Ok(m) => m
}
}
});
let cnx =
BrokerConnectionRemote::open(messages_stream_write, messages_stream_read, user);
Ok(cnx)
}
err => Err(Self::close(writer, ProtocolError::try_from(err).unwrap()).await),
}
}
}
pub struct BrokerConnectionRemote<T>
where
T: Sink<BrokerMessage> + Send + 'static,
{
writer: Arc<Mutex<Pin<Box<T>>>>,
user: PubKey,
actors: Arc<RwLock<HashMap<u64, WeakAddr<BrokerMessageActor>>>>,
stream_actors: Arc<RwLock<HashMap<u64, WeakAddr<BrokerMessageStreamActor>>>>,
shutdown: mpsc::UnboundedSender<Void>,
}
#[async_trait::async_trait]
impl<T> BrokerConnection for BrokerConnectionRemote<T>
where
T: Sink<BrokerMessage> + Send,
{
type OC = BrokerConnectionRemote<T>;
type BlockStream = async_channel::Receiver<Block>;
async fn close(&mut self) {
let _ = self.shutdown.close().await;
let mut w = self.writer.lock().await;
let _ = w.send(BrokerMessage::Close).await;
let _ = w.close().await;
}
async fn process_overlay_request_stream_response(
&mut self,
overlay: OverlayId,
request: BrokerOverlayRequestContentV0,
) -> Result<Pin<Box<Self::BlockStream>>, ProtocolError> {
let mut actor = BrokerMessageStreamActor::new();
let receiver = actor.receiver();
let error_receiver = actor.error_receiver();
let mut addr = actor
.start()
.await
.map_err(|_e| ProtocolError::ActorError)?;
let request_id = addr.actor_id();
//debug_println!("actor ID {}", request_id);
{
let mut map = self.stream_actors.write().expect("RwLock poisoned");
map.insert(request_id, addr.downgrade());
}
let mut w = self.writer.lock().await;
w.send(BrokerMessage::V0(BrokerMessageV0 {
padding: vec![], //FIXME implement padding
content: BrokerMessageContentV0::BrokerOverlayMessage(BrokerOverlayMessage::V0(
BrokerOverlayMessageV0 {
overlay,
content: BrokerOverlayMessageContentV0::BrokerOverlayRequest(
BrokerOverlayRequest::V0(BrokerOverlayRequestV0 {
id: request_id,
content: request,
}),
),
},
)),
}))
.await
.map_err(|_e| ProtocolError::WriteError)?;
//debug_println!("waiting for first reply");
let reply = error_receiver.await;
match reply {
Err(_e) => {
Err(ProtocolError::Closing)
}
Ok(Some(e)) => {
let mut map = self.stream_actors.write().expect("RwLock poisoned");
map.remove(&request_id);
return Err(e);
}
Ok(None) => {
let stream_actors_in_thread = Arc::clone(&self.stream_actors);
task::spawn(async move {
addr.wait_for_stop().await; // TODO add timeout
let mut map = stream_actors_in_thread.write().expect("RwLock poisoned");
map.remove(&request_id);
});
Ok(Box::pin(receiver))
}
}
}
async fn process_overlay_request_objectid_response(
&mut self,
overlay: OverlayId,
request: BrokerOverlayRequestContentV0,
) -> Result<ObjectId, ProtocolError> {
before!(self, request_id, addr, receiver);
self.writer.lock().await
.send(BrokerMessage::V0(BrokerMessageV0 {
padding: vec![], // FIXME implement padding
content: BrokerMessageContentV0::BrokerOverlayMessage(BrokerOverlayMessage::V0(
BrokerOverlayMessageV0 {
overlay,
content: BrokerOverlayMessageContentV0::BrokerOverlayRequest(
BrokerOverlayRequest::V0(BrokerOverlayRequestV0 {
id: request_id,
content: request,
}),
),
},
)),
}))
.await
.map_err(|_e| ProtocolError::WriteError)?;
after!(self, request_id, addr, receiver, reply);
reply.into()
}
async fn process_overlay_request(
&mut self,
overlay: OverlayId,
request: BrokerOverlayRequestContentV0,
) -> Result<(), ProtocolError> {
before!(self, request_id, addr, receiver);
self.writer.lock().await
.send(BrokerMessage::V0(BrokerMessageV0 {
padding: vec![], // FIXME implement padding
content: BrokerMessageContentV0::BrokerOverlayMessage(BrokerOverlayMessage::V0(
BrokerOverlayMessageV0 {
overlay,
content: BrokerOverlayMessageContentV0::BrokerOverlayRequest(
BrokerOverlayRequest::V0(BrokerOverlayRequestV0 {
id: request_id,
content: request,
}),
),
},
)),
}))
.await
.map_err(|_e| ProtocolError::WriteError)?;
after!(self, request_id, addr, receiver, reply);
reply.into()
}
async fn add_user(
&mut self,
user_id: PubKey,
admin_user_pk: PrivKey,
) -> Result<(), ProtocolError> {
before!(self, request_id, addr, receiver);
let op_content = AddUserContentV0 { user: user_id };
let sig = sign(
admin_user_pk,
self.user,
&serde_bare::to_vec(&op_content)?,
)?;
self.writer.lock().await
.send(BrokerMessage::V0(BrokerMessageV0 {
padding: vec![], // TODO implement padding
content: BrokerMessageContentV0::BrokerRequest(BrokerRequest::V0(
BrokerRequestV0 {
id: request_id,
content: BrokerRequestContentV0::AddUser(AddUser::V0(AddUserV0 {
content: op_content,
sig,
})),
},
)),
}))
.await
.map_err(|_e| ProtocolError::WriteError)?;
after!(self, request_id, addr, receiver, reply);
reply.into()
}
async fn del_user(&mut self, user_id: PubKey, admin_user_pk: PrivKey) {}
async fn add_client(&mut self, client_id: ClientId, user_pk: PrivKey) {}
async fn del_client(&mut self, client_id: ClientId, user_pk: PrivKey) {}
async fn overlay_connect(
&mut self,
repo_link: &RepoLink,
public: bool,
) -> Result<OverlayConnectionClient<BrokerConnectionRemote<T>>, ProtocolError> {
let overlay = self.process_overlay_connect(repo_link, public).await?;
Ok(OverlayConnectionClient::create(self, overlay,repo_link.clone() ))
}
}
#[derive(Debug)]
enum Void {}
impl<T> BrokerConnectionRemote<T>
where
T: Sink<BrokerMessage> + Send,
{
async fn connection_reader_loop<
U: Stream<Item = BrokerMessage> + StreamExt + Send + Sync + Unpin + 'static,
>(
stream: U,
actors: Arc<RwLock<HashMap<u64, WeakAddr<BrokerMessageActor>>>>,
stream_actors: Arc<RwLock<HashMap<u64, WeakAddr<BrokerMessageStreamActor>>>>,
shutdown: mpsc::UnboundedReceiver<Void>,
) -> Result<(), ProtocolError> {
let mut s = stream.fuse();
let mut shutdown = shutdown.fuse();
loop {
select! {
void = shutdown.next().fuse() => match void {
Some(void) => match void {},
None => break,
},
message = s.next().fuse() => match message {
Some(message) =>
{
//debug_println!("GOT MESSAGE {:?}", message);
if message.is_close() {
// releasing the blocking calls on the actors
let map = actors.read().expect("RwLock poisoned");
for (a) in map.values() {
if let Some(mut addr) = a.upgrade() {
let _ = addr.stop(Some(ProtocolError::Closing.into()));
}
}
let map2 = stream_actors.read().expect("RwLock poisoned");
for (a) in map2.values() {
if let Some(mut addr) = a.upgrade() {
let _ = addr.stop(Some(ProtocolError::Closing.into()));
}
}
return Err(ProtocolError::Closing);
}
if message.is_request() {
debug_println!("is request {}", message.id());
// closing connection. a client is not supposed to receive requests.
return Err(ProtocolError::Closing);
} else if message.is_response() {
let id = message.id();
//debug_println!("is response for {}", id);
{
let map = actors.read().expect("RwLock poisoned");
match map.get(&id) {
Some(weak_addr) => match weak_addr.upgrade() {
Some(addr) => {
addr.send(BrokerMessageXActor(message))
.map_err(|e| ProtocolError::Closing)?
//.expect("sending message back to actor failed");
}
None => {
debug_println!("ERROR. Addr is dead for ID {}", id);
return Err(ProtocolError::Closing);
}
},
None => {
let map2 = stream_actors.read().expect("RwLock poisoned");
match map2.get(&id) {
Some(weak_addr) => match weak_addr.upgrade() {
Some(addr) => {
addr.send(BrokerMessageXActor(message))
.map_err(|e| ProtocolError::Closing)?
//.expect("sending message back to stream actor failed");
}
None => {
debug_println!(
"ERROR. Addr is dead for ID {} {:?}",
id,
message
);
return Err(ProtocolError::Closing);
}
},
None => {
debug_println!("Actor ID not found {} {:?}", id, message);
return Err(ProtocolError::Closing);
}
}
}
}
}
}
},
None => break,
}
}
}
Ok(())
}
pub fn open<U: Stream<Item = BrokerMessage> + StreamExt + Send + Sync + Unpin + 'static>(
writer: T,
reader: U,
user: PubKey,
) -> BrokerConnectionRemote<T> {
let actors: Arc<RwLock<HashMap<u64, WeakAddr<BrokerMessageActor>>>> =
Arc::new(RwLock::new(HashMap::new()));
let stream_actors: Arc<RwLock<HashMap<u64, WeakAddr<BrokerMessageStreamActor>>>> =
Arc::new(RwLock::new(HashMap::new()));
let (shutdown_sender, shutdown_receiver) = mpsc::unbounded::<Void>();
let w = Arc::new(Mutex::new(Box::pin(writer)));
let ws_in_task = Arc::clone(&w);
let actors_in_thread = Arc::clone(&actors);
let stream_actors_in_thread = Arc::clone(&stream_actors);
task::spawn(async move {
debug_println!("START of reader loop");
if let Err(e) =
Self::connection_reader_loop(reader, actors_in_thread, stream_actors_in_thread, shutdown_receiver)
.await
{
debug_println!("closing because of {}", e);
let _ = ws_in_task.lock().await.close().await;
}
debug_println!("END of reader loop");
});
BrokerConnectionRemote::<T> {
writer: Arc::clone(&w),
user,
actors: Arc::clone(&actors),
stream_actors: Arc::clone(&stream_actors),
shutdown:shutdown_sender ,
}
}
}

@ -1,47 +0,0 @@
// All rights reserved.
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
// at your option. All files in the project carrying such
// notice may not be copied, modified, or distributed except
// according to those terms.
#[macro_export]
macro_rules! before {
( $self:expr, $request_id:ident, $addr:ident, $receiver:ident ) => {
let mut actor = BrokerMessageActor::new();
let $receiver = actor.receiver();
let mut $addr = actor
.start()
.await
.map_err(|_e| ProtocolError::ActorError)?;
let $request_id = $addr.actor_id();
//debug_println!("actor ID {}", $request_id);
{
let mut map = $self.actors.write().expect("RwLock poisoned");
map.insert($request_id, $addr.downgrade());
}
};
}
macro_rules! after {
( $self:expr, $request_id:ident, $addr:ident, $receiver:ident, $reply:ident ) => {
//debug_println!("waiting for reply");
$addr.wait_for_stop().await; // TODO add timeout and close connection if there's no reply
let r = $receiver.await;
if r.is_err() {
return Err(ProtocolError::Closing);
}
let $reply = r.unwrap();
//debug_println!("reply arrived {:?}", $reply);
{
let mut map = $self.actors.write().expect("RwLock poisoned");
map.remove(&$request_id);
}
};
}
//pub mod connection_remote;

@ -1,3 +1,14 @@
/*
* Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers
* All rights reserved.
* Licensed under the Apache License, Version 2.0
* <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
* or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
* at your option. All files in the project carrying such
* notice may not be copied, modified, or distributed except
* according to those terms.
*/
use async_std::stream::StreamExt;
use async_std::sync::{Mutex, MutexGuard};
use futures::{channel::mpsc, SinkExt};
@ -10,12 +21,6 @@ use crate::utils::{spawn_and_log_error, Receiver, ResultSend, Sender};
use crate::{connection::*, errors::ProtocolError, log, types::ProtocolMessage};
use std::marker::PhantomData;
// pub trait BrokerRequest: std::fmt::Debug {
// fn send(&self) -> ProtocolMessage;
// }
//pub trait BrokerResponse: TryFrom<ProtocolMessage> + std::fmt::Debug {}
impl TryFrom<ProtocolMessage> for () {
type Error = ProtocolError;
fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> {
@ -25,12 +30,9 @@ impl TryFrom<ProtocolMessage> for () {
#[async_trait::async_trait]
pub trait EActor: Send + Sync + std::fmt::Debug {
//type T: TryFrom<ProtocolMessage, Error = ProtocolError> + std::fmt::Debug;
//async fn handle(&mut self, msg: ProtocolMessage);
async fn respond(
&mut self,
msg: ProtocolMessage,
//stream: Option<impl BrokerRequest + std::marker::Send>,
fsm: Arc<Mutex<NoiseFSM>>,
) -> Result<(), ProtocolError>;
}
@ -49,37 +51,6 @@ pub struct Actor<
initiator: bool,
}
// #[async_trait::async_trait]
// impl<
// A: BrokerRequest + std::marker::Sync + 'static,
// B: TryFrom<ProtocolMessage, Error = ProtocolError>
// + std::fmt::Debug
// + std::marker::Sync
// + 'static,
// > EActor for Actor<'_, A, B>
// {
// //type T = B;
// // async fn handle(&mut self, msg: ProtocolMessage) {
// // if self.initiator && msg.type_id() == TypeId::of::<B>()
// // || !self.initiator && msg.type_id() == TypeId::of::<A>()
// // {
// // let _ = self.receiver_tx.send(ConnectionCommand::Msg(msg)).await;
// // } else {
// // log!("NOT OK");
// // }
// // }
// // async fn respond(id: i64, msg: A) -> Result<B, ProtocolError> {
// // let mut actor = Box::new(Actor::<A, B>::new(id, false));
// // //actor.process_request
// // match self.receiver.next().await {
// // Some(msg) => B::receive(msg),
// // _ => Err(ProtocolError::ActorError),
// // }
// // }
// }
pub enum SoS<B> {
Single(B),
Stream(Receiver<B>),
@ -139,10 +110,8 @@ impl<
pub async fn request(
&mut self,
msg: ProtocolMessage,
//stream: Option<impl BrokerRequest + std::marker::Send>,
fsm: Arc<Mutex<NoiseFSM>>,
) -> Result<SoS<B>, ProtocolError> {
//sender.send(ConnectionCommand::Msg(msg.send())).await;
fsm.lock().await.send(msg).await?;
let mut receiver = self.receiver.take().unwrap();
match receiver.next().await {

@ -1,3 +1,14 @@
/*
* Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers
* All rights reserved.
* Licensed under the Apache License, Version 2.0
* <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
* or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
* at your option. All files in the project carrying such
* notice may not be copied, modified, or distributed except
* according to those terms.
*/
use std::sync::Arc;
use crate::{actor::*, connection::NoiseFSM, errors::ProtocolError, types::ProtocolMessage};
@ -24,12 +35,6 @@ impl Noise {
}
}
// impl BrokerRequest for Noise {
// fn send(&self) -> ProtocolMessage {
// ProtocolMessage::Noise(self.clone())
// }
// }
impl From<Noise> for ProtocolMessage {
fn from(msg: Noise) -> ProtocolMessage {
ProtocolMessage::Noise(msg)

@ -1,3 +1,14 @@
/*
* Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers
* All rights reserved.
* Licensed under the Apache License, Version 2.0
* <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
* or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
* at your option. All files in the project carrying such
* notice may not be copied, modified, or distributed except
* according to those terms.
*/
use crate::actors::noise::Noise;
use crate::connection::NoiseFSM;
use crate::types::ExtResponse;
@ -102,12 +113,6 @@ impl ServerHello {
}
}
// impl BrokerRequest for ClientHello {
// fn send(&self) -> ProtocolMessage {
// ProtocolMessage::Start(StartProtocol::Client(ClientHello::Local))
// }
// }
impl From<ClientHello> for ProtocolMessage {
fn from(msg: ClientHello) -> ProtocolMessage {
ProtocolMessage::Start(StartProtocol::Client(msg))
@ -136,12 +141,6 @@ impl TryFrom<ProtocolMessage> for ServerHello {
}
}
// impl BrokerRequest for ServerHello {
// fn send(&self) -> ProtocolMessage {
// ProtocolMessage::ServerHello(self.clone())
// }
// }
impl From<ServerHello> for ProtocolMessage {
fn from(msg: ServerHello) -> ProtocolMessage {
ProtocolMessage::ServerHello(msg)

@ -1,3 +1,14 @@
/*
* Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers
* All rights reserved.
* Licensed under the Apache License, Version 2.0
* <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
* or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
* at your option. All files in the project carrying such
* notice may not be copied, modified, or distributed except
* according to those terms.
*/
use crate::actor::*;
use crate::connection::*;
use crate::errors::*;
@ -225,10 +236,8 @@ impl Broker {
}
// TODO check that not already connected to peer
//IpAddr::from_str("127.0.0.1");
//cnx.open(url, peer_pubk, peer_privk).await?;
//let cnx = Arc::new();
//let (priv_key, pub_key) = generate_keypair();
// IpAddr::from_str("127.0.0.1");
log!("CONNECTING");
let mut connection = cnx
.open(

@ -1,4 +1,15 @@
static NOISE_CONFIG: &'static str = "Noise_XK_25519_ChaChaPoly_BLAKE2b";
/*
* Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers
* All rights reserved.
* Licensed under the Apache License, Version 2.0
* <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
* or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
* at your option. All files in the project carrying such
* notice may not be copied, modified, or distributed except
* according to those terms.
*/
//static NOISE_CONFIG: &'static str = "Noise_XK_25519_ChaChaPoly_BLAKE2b";
use std::collections::HashMap;
use std::fmt;

@ -1,3 +1,14 @@
/*
* Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers
* All rights reserved.
* Licensed under the Apache License, Version 2.0
* <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
* or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
* at your option. All files in the project carrying such
* notice may not be copied, modified, or distributed except
* according to those terms.
*/
pub mod types;
pub mod errors;

@ -1,3 +1,14 @@
/*
* Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers
* All rights reserved.
* Licensed under the Apache License, Version 2.0
* <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
* or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
* at your option. All files in the project carrying such
* notice may not be copied, modified, or distributed except
* according to those terms.
*/
use crate::log;
use async_std::task;
use futures::{channel::mpsc, select, Future, FutureExt, SinkExt};

Loading…
Cancel
Save