parent
43f794e89e
commit
8c4915d9c5
@ -0,0 +1,3 @@ |
||||
pub struct ClientConnection {} |
||||
|
||||
impl ClientConnection {} |
@ -1,18 +1,20 @@ |
||||
// Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers
|
||||
// All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0
|
||||
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
|
||||
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
|
||||
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
|
||||
// at your option. All files in the project carrying such
|
||||
// notice may not be copied, modified, or distributed except
|
||||
// according to those terms.
|
||||
|
||||
use p2p_broker::server_ws::run_server; |
||||
|
||||
use p2p_net::WS_PORT; |
||||
|
||||
#[async_std::main] |
||||
async fn main() -> std::io::Result<()> { |
||||
println!("Starting NextGraph daemon..."); |
||||
|
||||
run_server("127.0.0.1:3012").await |
||||
run_server(format!("127.0.0.1:{}", WS_PORT).as_str()).await?; |
||||
|
||||
Ok(()) |
||||
} |
||||
|
@ -0,0 +1,3 @@ |
||||
pub struct ServerConnection {} |
||||
|
||||
impl ServerConnection {} |
@ -0,0 +1,41 @@ |
||||
[package] |
||||
name = "p2p-client-ws" |
||||
version = "0.1.0" |
||||
edition = "2021" |
||||
license = "MIT/Apache-2.0" |
||||
authors = ["Niko PLP <niko@nextgraph.org>"] |
||||
description = "P2P Client module of NextGraph" |
||||
repository = "https://git.nextgraph.org/NextGraph/nextgraph-rs" |
||||
|
||||
[dependencies] |
||||
debug_print = "1.0.0" |
||||
p2p-repo = { path = "../p2p-repo" } |
||||
p2p-net = { path = "../p2p-net" } |
||||
chacha20 = "0.9.0" |
||||
serde = { version = "1.0", features = ["derive"] } |
||||
serde_bare = "0.5.0" |
||||
serde_bytes = "0.11.7" |
||||
async-trait = "0.1.64" |
||||
async-std = { version = "1.12.0", features = ["attributes","unstable"] } |
||||
futures = "0.3.24" |
||||
async-channel = "1.7.1" |
||||
async-oneshot = "0.5.0" |
||||
ws_stream_wasm = "0.7" |
||||
pharos = "0.5" |
||||
wasm-bindgen = "0.2" |
||||
|
||||
[dev-dependencies] |
||||
wasm-bindgen-test = "^0.3" |
||||
|
||||
[target.'cfg(target_arch = "wasm32")'.dependencies.getrandom] |
||||
version = "0.2.7" |
||||
features = ["js"] |
||||
|
||||
# [target.'cfg(target_arch = "wasm32")'.dependencies] |
||||
|
||||
|
||||
[target.'cfg(not(target_arch = "wasm32"))'.dependencies] |
||||
getrandom = "0.2.7" |
||||
xactor = "0.7.11" |
||||
async-tungstenite = { version = "0.17.2", features = ["async-std-runtime"] } |
||||
p2p-client = { path = "../p2p-client" } |
@ -0,0 +1,53 @@ |
||||
// All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0
|
||||
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
|
||||
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
|
||||
// at your option. All files in the project carrying such
|
||||
// notice may not be copied, modified, or distributed except
|
||||
// according to those terms.
|
||||
|
||||
#[macro_export] |
||||
macro_rules! before { |
||||
( $self:expr, $request_id:ident, $addr:ident, $receiver:ident ) => { |
||||
let mut actor = BrokerMessageActor::new(); |
||||
let $receiver = actor.receiver(); |
||||
let mut $addr = actor |
||||
.start() |
||||
.await |
||||
.map_err(|_e| ProtocolError::ActorError)?; |
||||
|
||||
let $request_id = $addr.actor_id(); |
||||
//debug_println!("actor ID {}", $request_id);
|
||||
|
||||
{ |
||||
let mut map = $self.actors.write().expect("RwLock poisoned"); |
||||
map.insert($request_id, $addr.downgrade()); |
||||
} |
||||
}; |
||||
} |
||||
|
||||
macro_rules! after { |
||||
( $self:expr, $request_id:ident, $addr:ident, $receiver:ident, $reply:ident ) => { |
||||
//debug_println!("waiting for reply");
|
||||
|
||||
$addr.wait_for_stop().await; // TODO add timeout and close connection if there's no reply
|
||||
let r = $receiver.await; |
||||
if r.is_err() { |
||||
return Err(ProtocolError::Closing); |
||||
} |
||||
let $reply = r.unwrap(); |
||||
//debug_println!("reply arrived {:?}", $reply);
|
||||
{ |
||||
let mut map = $self.actors.write().expect("RwLock poisoned"); |
||||
map.remove(&$request_id); |
||||
} |
||||
}; |
||||
} |
||||
|
||||
//pub mod connection_ws;
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))] |
||||
pub mod remote_ws; |
||||
|
||||
#[cfg(target_arch = "wasm32")] |
||||
pub mod remote_ws_wasm; |
@ -0,0 +1,330 @@ |
||||
/* |
||||
* Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers |
||||
* All rights reserved. |
||||
* Licensed under the Apache License, Version 2.0 |
||||
* <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
|
||||
* or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
|
||||
* at your option. All files in the project carrying such |
||||
* notice may not be copied, modified, or distributed except |
||||
* according to those terms. |
||||
*/ |
||||
|
||||
//! WebSocket Remote Connection to a Broker
|
||||
|
||||
use std::sync::Arc; |
||||
|
||||
use async_std::net::TcpStream; |
||||
use async_tungstenite::tungstenite::protocol::frame::coding::CloseCode; |
||||
use async_tungstenite::tungstenite::protocol::CloseFrame; |
||||
use async_tungstenite::WebSocketStream; |
||||
use debug_print::*; |
||||
|
||||
use async_std::sync::Mutex; |
||||
use futures::io::Close; |
||||
use futures::FutureExt; |
||||
use futures::{future, pin_mut, select, stream, StreamExt}; |
||||
|
||||
use async_std::task; |
||||
use p2p_net::errors::*; |
||||
use p2p_net::log; |
||||
use p2p_net::types::*; |
||||
use p2p_net::utils::{spawn_and_log_error, ResultSend}; |
||||
use p2p_net::{connection::*, WS_PORT}; |
||||
use p2p_repo::types::*; |
||||
use p2p_repo::utils::{generate_keypair, now_timestamp}; |
||||
|
||||
use async_tungstenite::async_std::connect_async; |
||||
use async_tungstenite::tungstenite::{Error, Message}; |
||||
|
||||
pub struct ConnectionWebSocket {} |
||||
|
||||
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] |
||||
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] |
||||
impl IConnection for ConnectionWebSocket { |
||||
async fn open( |
||||
self: Arc<Self>, |
||||
ip: IP, |
||||
peer_pubk: PrivKey, |
||||
peer_privk: PubKey, |
||||
remote_peer: DirectPeerId, |
||||
) -> Result<(), NetError> { |
||||
let mut cnx = ConnectionBase::new(ConnectionDir::Client); |
||||
|
||||
let url = format!("ws://{}:{}", ip, WS_PORT); |
||||
|
||||
let res = connect_async(url).await; |
||||
|
||||
match (res) { |
||||
Err(e) => { |
||||
debug_println!("Cannot connect: {:?}", e); |
||||
Err(NetError::ConnectionError) |
||||
} |
||||
Ok((mut websocket, _)) => { |
||||
//let ws = Arc::new(Mutex::new(Box::pin(websocket)));
|
||||
|
||||
// let (write, read) = ws.split();
|
||||
// let mut stream_read = read.map(|msg_res| match msg_res {
|
||||
// Err(e) => {
|
||||
// debug_println!("READ ERROR {:?}", e);
|
||||
// ConnectionCommand::Error(NetError::IoError)
|
||||
// }
|
||||
// Ok(message) => {
|
||||
// if message.is_close() {
|
||||
// debug_println!("CLOSE FROM SERVER");
|
||||
// ConnectionCommand::Close
|
||||
// } else {
|
||||
// ConnectionCommand::Msg(
|
||||
// serde_bare::from_slice::<ProtocolMessage>(&message.into_data())
|
||||
// .unwrap(),
|
||||
// )
|
||||
// }
|
||||
// }
|
||||
// });
|
||||
// async fn write_transform(cmd: ConnectionCommand) -> Result<Message, Error> {
|
||||
// match cmd {
|
||||
// ConnectionCommand::Error(_) => Err(Error::AlreadyClosed), //FIXME
|
||||
// ConnectionCommand::ProtocolError(_) => Err(Error::AlreadyClosed), //FIXME
|
||||
// ConnectionCommand::Close => {
|
||||
// // todo close cnx. }
|
||||
// Err(Error::AlreadyClosed)
|
||||
// }
|
||||
// ConnectionCommand::Msg(msg) => Ok(Message::binary(
|
||||
// serde_bare::to_vec(&msg)
|
||||
// .map_err(|_| Error::AlreadyClosed) //FIXME
|
||||
// .unwrap(),
|
||||
// )),
|
||||
// }
|
||||
// }
|
||||
// let stream_write = write
|
||||
// .with(|message| write_transform(message))
|
||||
// .sink_map_err(|e| NetError::IoError);
|
||||
|
||||
// ws.close(Some(CloseFrame {
|
||||
// code: CloseCode::Library(4000),
|
||||
// reason: std::borrow::Cow::Borrowed(""),
|
||||
// }))
|
||||
// .await;
|
||||
|
||||
cnx.start_read_loop(); |
||||
let s = cnx.take_sender(); |
||||
let r = cnx.take_receiver(); |
||||
|
||||
//let ws_in_task = Arc::clone(&ws);
|
||||
task::spawn(async move { |
||||
debug_println!("START of WS loop"); |
||||
//let w = ws_in_task.lock().await;
|
||||
ws_loop(websocket, s, r).await; |
||||
// .close(Some(CloseFrame {
|
||||
// code: CloseCode::Library(4000),
|
||||
// reason: std::borrow::Cow::Borrowed(""),
|
||||
// }))
|
||||
// .await;
|
||||
debug_println!("END of WS loop"); |
||||
}); |
||||
|
||||
//spawn_and_log_error(ws_loop(ws, cnx.take_sender(), cnx.take_receiver()));
|
||||
|
||||
log!("sending..."); |
||||
// cnx.send(ConnectionCommand::Close).await;
|
||||
|
||||
// cnx.send(ConnectionCommand::Msg(ProtocolMessage::Start(
|
||||
// StartProtocol::Auth(ClientHello::V0()),
|
||||
// )))
|
||||
// .await;
|
||||
|
||||
//cnx.close().await;
|
||||
|
||||
// let _ = cnx.inject(last_command).await;
|
||||
// let _ = cnx.close_streams().await;
|
||||
|
||||
// Note that since WsMeta::connect resolves to an opened connection, we don't see
|
||||
// any Open events here.
|
||||
//
|
||||
//assert!(evts.next().await.unwrap_throw().is_closing());
|
||||
|
||||
// TODO wait for close
|
||||
|
||||
//log!("WS closed {:?}", last_event.clone());
|
||||
|
||||
//Ok(cnx)
|
||||
Ok(()) |
||||
} |
||||
} |
||||
} |
||||
|
||||
async fn accept(&mut self) -> Result<(), NetError> { |
||||
let cnx = ConnectionBase::new(ConnectionDir::Server); |
||||
|
||||
Ok(()) |
||||
} |
||||
|
||||
fn tp(&self) -> TransportProtocol { |
||||
TransportProtocol::WS |
||||
} |
||||
} |
||||
|
||||
async fn close_ws( |
||||
stream: &mut WebSocketStream<TcpStream>, |
||||
receiver: &mut Sender<ConnectionCommand>, |
||||
code: u16, |
||||
reason: &str, |
||||
) -> Result<(), NetError> { |
||||
log!("close_ws"); |
||||
let _ = futures::SinkExt::send(receiver, ConnectionCommand::Close).await; |
||||
stream |
||||
.close(Some(CloseFrame { |
||||
code: CloseCode::Library(code), |
||||
reason: std::borrow::Cow::Borrowed(reason), |
||||
})) |
||||
.await |
||||
.map_err(|_e| NetError::WsError)?; |
||||
Ok(()) |
||||
} |
||||
|
||||
async fn ws_loop( |
||||
mut ws: WebSocketStream<TcpStream>, |
||||
sender: Receiver<ConnectionCommand>, |
||||
mut receiver: Sender<ConnectionCommand>, |
||||
) -> Result<(), NetError> { |
||||
async fn inner_loop( |
||||
stream: &mut WebSocketStream<TcpStream>, |
||||
mut sender: Receiver<ConnectionCommand>, |
||||
receiver: &mut Sender<ConnectionCommand>, |
||||
) -> Result<ProtocolError, NetError> { |
||||
//let mut rx_sender = sender.fuse();
|
||||
pin_mut!(stream); |
||||
loop { |
||||
select! { |
||||
r = stream.next().fuse() => match r { |
||||
Some(Ok(msg)) => { |
||||
log!("GOT MESSAGE {:?}", msg); |
||||
|
||||
if msg.is_close() { |
||||
if let Message::Close(Some(cf)) = msg { |
||||
log!("CLOSE from server: {}",cf.reason); |
||||
let last_command = match cf.code { |
||||
CloseCode::Normal => |
||||
ConnectionCommand::Close, |
||||
CloseCode::Library(c) => { |
||||
if c < 4950 { |
||||
ConnectionCommand::ProtocolError( |
||||
ProtocolError::try_from(c - 4000).unwrap(), |
||||
) |
||||
} else { |
||||
ConnectionCommand::Error(NetError::try_from(c - 4949).unwrap()) |
||||
} |
||||
}, |
||||
_ => ConnectionCommand::Error(NetError::WsError) |
||||
}; |
||||
let _ = futures::SinkExt::send(receiver, last_command).await; |
||||
} |
||||
else { |
||||
let _ = futures::SinkExt::send(receiver, ConnectionCommand::Close).await; |
||||
log!("CLOSE from server"); |
||||
} |
||||
|
||||
} else { |
||||
futures::SinkExt::send(receiver,ConnectionCommand::Msg(serde_bare::from_slice::<ProtocolMessage>(&msg.into_data())?)).await |
||||
.map_err(|_e| NetError::IoError)?; |
||||
} |
||||
return Ok(ProtocolError::Closing); |
||||
}, |
||||
Some(Err(e)) => break, |
||||
None => break
|
||||
}, |
||||
s = sender.next().fuse() => match s { |
||||
Some(msg) => { |
||||
log!("SENDING MESSAGE {:?}", msg); |
||||
match msg { |
||||
ConnectionCommand::Msg(m) => { |
||||
if let ProtocolMessage::Start(s) = m { |
||||
futures::SinkExt::send(&mut stream, Message::binary(serde_bare::to_vec(&s)?)).await.map_err(|_e| NetError::IoError)?; |
||||
} |
||||
}, |
||||
ConnectionCommand::Error(e) => { |
||||
return Err(e); |
||||
}, |
||||
ConnectionCommand::ProtocolError(e) => { |
||||
return Ok(e); |
||||
}, |
||||
ConnectionCommand::Close => { |
||||
break; |
||||
} |
||||
} |
||||
}, |
||||
None => break
|
||||
}, |
||||
} |
||||
} |
||||
Ok(ProtocolError::NoError) |
||||
} |
||||
match inner_loop(&mut ws, sender, &mut receiver).await { |
||||
Ok(proto_err) => { |
||||
if proto_err == ProtocolError::Closing { |
||||
ws.close(None).await.map_err(|_e| NetError::WsError)?; |
||||
} else if proto_err == ProtocolError::NoError { |
||||
close_ws(&mut ws, &mut receiver, 1000, "").await?; |
||||
} else { |
||||
let mut code = proto_err.clone() as u16; |
||||
if code > 949 { |
||||
code = ProtocolError::OtherError as u16; |
||||
} |
||||
close_ws(&mut ws, &mut receiver, code + 4000, &proto_err.to_string()).await?; |
||||
return Err(NetError::ProtocolError); |
||||
} |
||||
} |
||||
Err(e) => { |
||||
close_ws( |
||||
&mut ws, |
||||
&mut receiver, |
||||
e.clone() as u16 + 4949, |
||||
&e.to_string(), |
||||
) |
||||
.await?; |
||||
return Err(e); |
||||
} |
||||
} |
||||
log!("END OF LOOP"); |
||||
Ok(()) |
||||
} |
||||
|
||||
#[cfg(test)] |
||||
mod test { |
||||
|
||||
use crate::remote_ws::*; |
||||
use async_std::task; |
||||
use p2p_net::broker::*; |
||||
use p2p_net::errors::NetError; |
||||
use p2p_net::log; |
||||
use p2p_net::types::IP; |
||||
use p2p_net::utils::{spawn_and_log_error, ResultSend}; |
||||
use p2p_repo::utils::generate_keypair; |
||||
use std::net::IpAddr; |
||||
use std::str::FromStr; |
||||
|
||||
#[async_std::test] |
||||
pub async fn test_ws() -> Result<(), NetError> { |
||||
let mut random_buf = [0u8; 32]; |
||||
getrandom::getrandom(&mut random_buf).unwrap(); |
||||
//spawn_and_log_error(testt("ws://127.0.0.1:3012"));
|
||||
|
||||
log!("start connecting"); |
||||
let cnx = Arc::new(ConnectionWebSocket {}); |
||||
let (priv_key, pub_key) = generate_keypair(); |
||||
let broker = Broker::new(); |
||||
let res = broker |
||||
.connect( |
||||
cnx, |
||||
IP::try_from(&IpAddr::from_str("127.0.0.1").unwrap()).unwrap(), |
||||
None, |
||||
priv_key, |
||||
pub_key, |
||||
pub_key, |
||||
) |
||||
.await; |
||||
log!("broker.connect : {:?}", res); |
||||
//res.expect_throw("assume the connection succeeds");
|
||||
|
||||
Ok(()) |
||||
} |
||||
} |
@ -0,0 +1,198 @@ |
||||
/* |
||||
* Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers |
||||
* All rights reserved. |
||||
* Licensed under the Apache License, Version 2.0 |
||||
* <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
|
||||
* or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
|
||||
* at your option. All files in the project carrying such |
||||
* notice may not be copied, modified, or distributed except |
||||
* according to those terms. |
||||
*/ |
||||
|
||||
//! WebSocket for Wasm Remote Connection to a Broker
|
||||
|
||||
use futures::FutureExt; |
||||
use futures::{future, pin_mut, select, stream, SinkExt, StreamExt}; |
||||
use p2p_net::connection::*; |
||||
use p2p_net::errors::*; |
||||
use p2p_net::log; |
||||
use p2p_net::types::*; |
||||
use p2p_net::utils::*; |
||||
use p2p_net::{connection::*, WS_PORT}; |
||||
use p2p_repo::types::*; |
||||
use p2p_repo::utils::{generate_keypair, now_timestamp}; |
||||
use std::sync::Arc; |
||||
|
||||
use { |
||||
pharos::{Filter, Observable, ObserveConfig}, |
||||
wasm_bindgen::UnwrapThrowExt, |
||||
ws_stream_wasm::*, |
||||
}; |
||||
|
||||
pub struct ConnectionWebSocket {} |
||||
|
||||
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] |
||||
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] |
||||
impl IConnection for ConnectionWebSocket { |
||||
fn tp(&self) -> TransportProtocol { |
||||
TransportProtocol::WS |
||||
} |
||||
|
||||
async fn open( |
||||
self: Arc<Self>, |
||||
ip: IP, |
||||
peer_pubk: PrivKey, |
||||
peer_privk: PubKey, |
||||
remote_peer: DirectPeerId, |
||||
) -> Result<(), NetError> { |
||||
//pub async fn testt(url: &str) -> ResultSend<()> {
|
||||
let mut cnx = ConnectionBase::new(ConnectionDir::Client); |
||||
|
||||
let url = format!("ws://{}:{}", ip, WS_PORT); |
||||
|
||||
let (mut ws, wsio) = WsMeta::connect(url, None).await.map_err(|e| { |
||||
//log!("{:?}", e);
|
||||
NetError::ConnectionError |
||||
})?; |
||||
|
||||
let mut evts = ws |
||||
.observe(ObserveConfig::default()) |
||||
//.observe(Filter::Pointer(WsEvent::is_closed).into())
|
||||
.await |
||||
.expect_throw("observe"); |
||||
|
||||
//let (mut sender_tx, sender_rx) = mpsc::unbounded();
|
||||
//let (mut receiver_tx, receiver_rx) = mpsc::unbounded();
|
||||
|
||||
cnx.start_read_loop(); |
||||
|
||||
spawn_and_log_error(ws_loop(ws, wsio, cnx.take_sender(), cnx.take_receiver())); |
||||
|
||||
//spawn_and_log_error(read_loop(receiver_rx, sender_tx.clone()));
|
||||
|
||||
log!("sending..."); |
||||
// cnx.send(ConnectionCommand::Close).await;
|
||||
|
||||
// cnx.send(ConnectionCommand::Msg(ProtocolMessage::Start(
|
||||
// StartProtocol::Auth(ClientHello::V0()),
|
||||
// )))
|
||||
// .await;
|
||||
|
||||
cnx.close().await; |
||||
|
||||
// Note that since WsMeta::connect resolves to an opened connection, we don't see
|
||||
// any Open events here.
|
||||
//
|
||||
//assert!(evts.next().await.unwrap_throw().is_closing());
|
||||
let last_event = evts.next().await; |
||||
log!("WS closed {:?}", last_event.clone()); |
||||
|
||||
let last_command = match last_event { |
||||
None => ConnectionCommand::Close, |
||||
Some(WsEvent::Open) => ConnectionCommand::Error(NetError::WsError), // this should never happen
|
||||
Some(WsEvent::Error) => ConnectionCommand::Error(NetError::ConnectionError), |
||||
Some(WsEvent::Closing) => ConnectionCommand::Close, |
||||
Some(WsEvent::Closed(ce)) => { |
||||
if ce.code == 1000 { |
||||
ConnectionCommand::Close |
||||
} else if ce.code < 4000 { |
||||
ConnectionCommand::Error(NetError::WsError) |
||||
} else if ce.code < 4950 { |
||||
ConnectionCommand::ProtocolError( |
||||
ProtocolError::try_from(ce.code - 4000).unwrap(), |
||||
) |
||||
} else { |
||||
ConnectionCommand::Error(NetError::try_from(ce.code - 4949).unwrap()) |
||||
} |
||||
} |
||||
Some(WsEvent::WsErr(_e)) => ConnectionCommand::Error(NetError::WsError), |
||||
}; |
||||
let _ = cnx.inject(last_command).await; |
||||
let _ = cnx.close_streams().await; |
||||
|
||||
//Ok(cnx)
|
||||
Ok(()) |
||||
} |
||||
|
||||
async fn accept(&mut self) -> Result<(), NetError> { |
||||
!unimplemented!() |
||||
} |
||||
} |
||||
|
||||
async fn ws_loop( |
||||
ws: WsMeta, |
||||
mut stream: WsStream, |
||||
sender: Receiver<ConnectionCommand>, |
||||
receiver: Sender<ConnectionCommand>, |
||||
) -> ResultSend<()> { |
||||
async fn inner_loop( |
||||
stream: &mut WsStream, |
||||
mut sender: Receiver<ConnectionCommand>, |
||||
mut receiver: Sender<ConnectionCommand>, |
||||
) -> Result<ProtocolError, NetError> { |
||||
//let mut rx_sender = sender.fuse();
|
||||
loop { |
||||
select! { |
||||
r = stream.next().fuse() => match r { |
||||
Some(msg) => { |
||||
log!("GOT MESSAGE {:?}", msg); |
||||
if let WsMessage::Binary(b) = msg { |
||||
receiver.send(ConnectionCommand::Msg(serde_bare::from_slice::<ProtocolMessage>(&b)?)).await |
||||
.map_err(|_e| NetError::IoError)?; |
||||
} |
||||
else { |
||||
break; |
||||
} |
||||
}, |
||||
None => break
|
||||
}, |
||||
s = sender.next().fuse() => match s { |
||||
Some(msg) => { |
||||
log!("SENDING MESSAGE {:?}", msg); |
||||
match msg { |
||||
ConnectionCommand::Msg(m) => { |
||||
if let ProtocolMessage::Start(s) = m { |
||||
stream.send(WsMessage::Binary(serde_bare::to_vec(&s)?)).await.map_err(|_e| NetError::IoError)?; |
||||
} |
||||
}, |
||||
ConnectionCommand::Error(e) => { |
||||
return Err(e); |
||||
}, |
||||
ConnectionCommand::ProtocolError(e) => { |
||||
return Ok(e); |
||||
}, |
||||
ConnectionCommand::Close => { |
||||
break; |
||||
} |
||||
} |
||||
}, |
||||
None => break
|
||||
}, |
||||
} |
||||
} |
||||
Ok(ProtocolError::NoError) |
||||
} |
||||
match inner_loop(&mut stream, sender, receiver).await { |
||||
Ok(proto_err) => { |
||||
if proto_err == ProtocolError::NoError { |
||||
ws.close_code(1000).await.map_err(|_e| NetError::WsError)?; |
||||
} else { |
||||
let mut code = proto_err.clone() as u16; |
||||
if code > 949 { |
||||
code = ProtocolError::OtherError as u16; |
||||
} |
||||
ws.close_reason(code + 4000, proto_err.to_string()) |
||||
.await |
||||
.map_err(|_e| NetError::WsError)?; |
||||
return Err(Box::new(proto_err)); |
||||
} |
||||
} |
||||
Err(e) => { |
||||
ws.close_reason(e.clone() as u16 + 4949, e.to_string()) |
||||
.await |
||||
.map_err(|_e| NetError::WsError)?; |
||||
return Err(Box::new(e)); |
||||
} |
||||
} |
||||
Ok(()) |
||||
} |
@ -0,0 +1,60 @@ |
||||
use futures::{channel::mpsc, SinkExt}; |
||||
use serde::de::DeserializeOwned; |
||||
|
||||
use crate::{connection::*, errors::ProtocolError}; |
||||
use std::marker::PhantomData; |
||||
|
||||
pub trait BrokerRequest: DeserializeOwned {} |
||||
|
||||
pub trait BrokerResponse: DeserializeOwned { |
||||
fn test(&self); |
||||
} |
||||
|
||||
impl BrokerResponse for () { |
||||
fn test(&self) {} |
||||
} |
||||
|
||||
pub trait IActor: EActor { |
||||
fn process_request(&self) {} |
||||
} |
||||
|
||||
#[async_trait::async_trait] |
||||
pub trait EActor { |
||||
async fn handle(&mut self, cmd: ConnectionCommand); |
||||
} |
||||
|
||||
pub struct Actor<'a, A: BrokerRequest, B: BrokerResponse> { |
||||
id: i64, |
||||
phantom_a: PhantomData<&'a A>, |
||||
phantom_b: PhantomData<&'a B>, |
||||
receiver: Receiver<ConnectionCommand>, |
||||
receiver_tx: Sender<ConnectionCommand>, |
||||
} |
||||
|
||||
#[async_trait::async_trait] |
||||
impl<A: BrokerRequest + std::marker::Sync, B: BrokerResponse + std::marker::Sync> EActor |
||||
for Actor<'_, A, B> |
||||
{ |
||||
async fn handle(&mut self, cmd: ConnectionCommand) { |
||||
let _ = self.receiver_tx.send(cmd).await; |
||||
} |
||||
} |
||||
|
||||
impl<A: BrokerRequest, B: BrokerResponse> Actor<'_, A, B> { |
||||
pub fn new(id: i64) -> Self { |
||||
let (mut receiver_tx, receiver) = mpsc::unbounded::<ConnectionCommand>(); |
||||
Self { |
||||
id, |
||||
receiver, |
||||
receiver_tx, |
||||
phantom_a: PhantomData, |
||||
phantom_b: PhantomData, |
||||
} |
||||
} |
||||
|
||||
pub fn request(&self, msg: A, stream: Option<A>) -> Result<B, ProtocolError> { |
||||
let b: Vec<u8> = vec![]; |
||||
let a = serde_bare::from_slice::<B>(&b).unwrap(); |
||||
Ok(a) |
||||
} |
||||
} |
@ -0,0 +1,2 @@ |
||||
pub mod noise; |
||||
pub use noise::*; |
@ -0,0 +1,20 @@ |
||||
use crate::{actor::*, errors::ProtocolError}; |
||||
use serde::{Deserialize, Serialize}; |
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)] |
||||
pub struct NoiseV0 { |
||||
data: Vec<u8>, |
||||
} |
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)] |
||||
pub enum Noise { |
||||
V0(NoiseV0), |
||||
} |
||||
|
||||
impl BrokerRequest for Noise {} |
||||
|
||||
impl Actor<'_, Noise, ()> {} |
||||
|
||||
impl IActor for Actor<'_, Noise, ()> { |
||||
//fn process_request(&self) {}
|
||||
} |
@ -0,0 +1,84 @@ |
||||
use crate::connection::*; |
||||
use crate::errors::*; |
||||
use crate::types::*; |
||||
use crate::utils::ResultSend; |
||||
use p2p_repo::types::{PrivKey, PubKey}; |
||||
use p2p_repo::utils::generate_keypair; |
||||
use std::collections::HashMap; |
||||
use std::net::IpAddr; |
||||
use std::sync::{Arc, RwLock}; |
||||
|
||||
use crate::actor::*; |
||||
|
||||
pub enum PeerConnection { |
||||
Core(IP), |
||||
Client(Box<Arc<dyn IConnection>>), |
||||
NONE, |
||||
} |
||||
|
||||
pub struct BrokerPeerInfo { |
||||
lastPeerAdvert: Option<PeerAdvert>, //FIXME: remove Option
|
||||
connected: PeerConnection, |
||||
} |
||||
|
||||
pub struct DirectConnection { |
||||
ip: IP, |
||||
interface: String, |
||||
remote_peer_id: DirectPeerId, |
||||
tp: TransportProtocol, |
||||
//dir: ConnectionDir,
|
||||
cnx: Box<Arc<dyn IConnection>>, |
||||
} |
||||
|
||||
pub struct Broker { |
||||
//actors: Arc<RwLock<HashMap<i64, Box<dyn IActor>>>>,
|
||||
direct_connections: Arc<RwLock<HashMap<IP, DirectConnection>>>, |
||||
peers: Arc<RwLock<HashMap<DirectPeerId, BrokerPeerInfo>>>, |
||||
} |
||||
|
||||
impl Broker { |
||||
pub fn new() -> Self { |
||||
Broker { |
||||
direct_connections: Arc::new(RwLock::new(HashMap::new())), |
||||
peers: Arc::new(RwLock::new(HashMap::new())), |
||||
} |
||||
} |
||||
|
||||
pub async fn connect( |
||||
&self, |
||||
cnx: Arc<dyn IConnection>, |
||||
ip: IP, |
||||
core: Option<String>, |
||||
peer_pubk: PrivKey, |
||||
peer_privk: PubKey, |
||||
remote_peer_id: DirectPeerId, |
||||
) -> Result<(), NetError> { |
||||
// TODO check that not already connected to peer
|
||||
//IpAddr::from_str("127.0.0.1");
|
||||
//cnx.open(url, peer_pubk, peer_privk).await?;
|
||||
//let cnx = Arc::new();
|
||||
let (priv_key, pub_key) = generate_keypair(); |
||||
Arc::clone(&cnx) |
||||
.open(ip, priv_key, pub_key, remote_peer_id) |
||||
.await?; |
||||
let connected = if core.is_some() { |
||||
let dc = DirectConnection { |
||||
ip, |
||||
interface: core.unwrap(), |
||||
remote_peer_id, |
||||
tp: cnx.tp(), |
||||
cnx: Box::new(Arc::clone(&cnx)), |
||||
}; |
||||
self.direct_connections.write().unwrap().insert(ip, dc); |
||||
PeerConnection::Core(ip) |
||||
} else { |
||||
PeerConnection::Client(Box::new(Arc::clone(&cnx))) |
||||
}; |
||||
let bpi = BrokerPeerInfo { |
||||
lastPeerAdvert: None, |
||||
connected, |
||||
}; |
||||
self.peers.write().unwrap().insert(remote_peer_id, bpi); |
||||
Ok(()) |
||||
} |
||||
} |
@ -0,0 +1,142 @@ |
||||
static NOISE_CONFIG: &'static str = "Noise_XK_25519_ChaChaPoly_BLAKE2b"; |
||||
|
||||
use std::sync::Arc; |
||||
|
||||
use crate::actors::*; |
||||
use crate::errors::NetError; |
||||
use crate::errors::ProtocolError; |
||||
use crate::log; |
||||
use crate::types::*; |
||||
use crate::utils::*; |
||||
use async_std::stream::StreamExt; |
||||
use futures::{channel::mpsc, select, Future, FutureExt, SinkExt}; |
||||
use p2p_repo::types::{PrivKey, PubKey}; |
||||
use unique_id::sequence::SequenceGenerator; |
||||
use unique_id::Generator; |
||||
use unique_id::GeneratorFromSeed; |
||||
|
||||
pub type Sender<T> = mpsc::UnboundedSender<T>; |
||||
pub type Receiver<T> = mpsc::UnboundedReceiver<T>; |
||||
|
||||
#[derive(Debug)] |
||||
pub enum ConnectionCommand { |
||||
Msg(ProtocolMessage), |
||||
Error(NetError), |
||||
ProtocolError(ProtocolError), |
||||
Close, |
||||
} |
||||
|
||||
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] |
||||
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] |
||||
pub trait IConnection { |
||||
async fn open( |
||||
self: Arc<Self>, |
||||
ip: IP, |
||||
peer_pubk: PrivKey, |
||||
peer_privk: PubKey, |
||||
remote_peer: DirectPeerId, |
||||
) -> Result<(), NetError>; |
||||
async fn accept(&mut self) -> Result<(), NetError>; |
||||
fn tp(&self) -> TransportProtocol; |
||||
} |
||||
|
||||
#[derive(PartialEq)] |
||||
pub enum ConnectionDir { |
||||
Server, |
||||
Client, |
||||
} |
||||
|
||||
pub struct ConnectionBase { |
||||
sender: Option<Receiver<ConnectionCommand>>, |
||||
receiver: Option<Sender<ConnectionCommand>>, |
||||
sender_tx: Option<Sender<ConnectionCommand>>, |
||||
receiver_tx: Option<Sender<ConnectionCommand>>, |
||||
dir: ConnectionDir, |
||||
next_request_id: SequenceGenerator, |
||||
} |
||||
|
||||
impl ConnectionBase { |
||||
pub fn new(dir: ConnectionDir) -> Self { |
||||
Self { |
||||
receiver: None, |
||||
sender: None, |
||||
sender_tx: None, |
||||
receiver_tx: None, |
||||
next_request_id: SequenceGenerator::new(1), |
||||
dir, |
||||
} |
||||
} |
||||
|
||||
pub fn take_sender(&mut self) -> Receiver<ConnectionCommand> { |
||||
self.sender.take().unwrap() |
||||
} |
||||
|
||||
pub fn take_receiver(&mut self) -> Sender<ConnectionCommand> { |
||||
self.receiver.take().unwrap() |
||||
} |
||||
|
||||
pub fn guard(&mut self, dir: ConnectionDir) -> Result<(), NetError> { |
||||
if self.dir == dir { |
||||
Ok(()) |
||||
} else { |
||||
Err(NetError::DirectionAlreadySet) |
||||
} |
||||
} |
||||
|
||||
async fn read_loop( |
||||
mut receiver: Receiver<ConnectionCommand>, |
||||
mut sender: Sender<ConnectionCommand>, |
||||
) -> ResultSend<()> { |
||||
while let Some(msg) = receiver.next().await { |
||||
log!("RECEIVED: {:?}", msg); |
||||
|
||||
// sender
|
||||
// .send(ConnectionCommand::Close)
|
||||
// .await
|
||||
// .map_err(|e| "channel send error")?
|
||||
|
||||
if let ConnectionCommand::Close = msg { |
||||
log!("EXIT READ LOOP"); |
||||
break; |
||||
} |
||||
} |
||||
Ok(()) |
||||
} |
||||
|
||||
pub async fn request(&mut self) { |
||||
let mut id = self.next_request_id.next_id(); |
||||
if self.dir == ConnectionDir::Server { |
||||
id = !id + 1; |
||||
} |
||||
// id
|
||||
} |
||||
|
||||
pub async fn send(&mut self, cmd: ConnectionCommand) { |
||||
let _ = self.sender_tx.as_mut().unwrap().send(cmd).await; |
||||
} |
||||
|
||||
pub async fn inject(&mut self, cmd: ConnectionCommand) { |
||||
let _ = self.receiver_tx.as_mut().unwrap().send(cmd).await; |
||||
} |
||||
|
||||
pub async fn close_streams(&mut self) { |
||||
let _ = self.receiver_tx.as_mut().unwrap().close_channel(); |
||||
let _ = self.sender_tx.as_mut().unwrap().close_channel(); |
||||
} |
||||
|
||||
pub async fn close(&mut self) { |
||||
log!("closing..."); |
||||
self.send(ConnectionCommand::Close).await; |
||||
} |
||||
|
||||
pub fn start_read_loop(&mut self) { |
||||
let (sender_tx, sender_rx) = mpsc::unbounded(); |
||||
let (receiver_tx, receiver_rx) = mpsc::unbounded(); |
||||
self.sender = Some(sender_rx); |
||||
self.receiver = Some(receiver_tx.clone()); |
||||
self.sender_tx = Some(sender_tx.clone()); |
||||
self.receiver_tx = Some(receiver_tx); |
||||
|
||||
spawn_and_log_error(Self::read_loop(receiver_rx, sender_tx)); |
||||
} |
||||
} |
@ -0,0 +1,32 @@ |
||||
use crate::log; |
||||
use async_std::task; |
||||
use futures::{channel::mpsc, select, Future, FutureExt, SinkExt}; |
||||
|
||||
#[cfg(target_arch = "wasm32")] |
||||
pub fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()> |
||||
where |
||||
F: Future<Output = ResultSend<()>> + 'static, |
||||
{ |
||||
task::spawn_local(async move { |
||||
if let Err(e) = fut.await { |
||||
log!("EXCEPTION {}", e) |
||||
} |
||||
}) |
||||
} |
||||
#[cfg(target_arch = "wasm32")] |
||||
pub type ResultSend<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>; |
||||
|
||||
#[cfg(not(target_arch = "wasm32"))] |
||||
pub type ResultSend<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>; |
||||
|
||||
#[cfg(not(target_arch = "wasm32"))] |
||||
pub fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()> |
||||
where |
||||
F: Future<Output = ResultSend<()>> + Send + 'static, |
||||
{ |
||||
task::spawn(async move { |
||||
if let Err(e) = fut.await { |
||||
eprintln!("{}", e) |
||||
} |
||||
}) |
||||
} |
@ -1,3 +1,5 @@ |
||||
#[cfg(not(target_arch = "wasm32"))] |
||||
pub mod repo_store; |
||||
|
||||
#[cfg(not(target_arch = "wasm32"))] |
||||
pub mod broker_store; |
||||
|
Loading…
Reference in new issue