diff --git a/Cargo.lock b/Cargo.lock index abbafaf..327cbaf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1121,6 +1121,7 @@ dependencies = [ "async-std", "p2p-broker", "p2p-net", + "p2p-repo", ] [[package]] @@ -1212,6 +1213,7 @@ dependencies = [ "futures", "getrandom 0.2.8", "hex", + "p2p-client-ws", "p2p-net", "p2p-repo", "p2p-stores-lmdb", @@ -1278,6 +1280,7 @@ dependencies = [ "blake3", "debug_print", "futures", + "getrandom 0.2.8", "gloo-timers", "noise-protocol", "noise-rust-crypto", diff --git a/ng-app-js/src/lib.rs b/ng-app-js/src/lib.rs index f7d085f..4d7139b 100644 --- a/ng-app-js/src/lib.rs +++ b/ng-app-js/src/lib.rs @@ -1,4 +1,5 @@ use async_std::task; +#[cfg(target_arch = "wasm32")] use js_sys::Reflect; #[cfg(target_arch = "wasm32")] use p2p_client_ws::remote_ws_wasm::ConnectionWebSocket; @@ -6,6 +7,7 @@ use p2p_net::broker::*; use p2p_net::types::{DirectPeerId, IP}; use p2p_net::utils::{spawn_and_log_error, ResultSend}; use p2p_net::{log, sleep}; +use p2p_repo::types::PubKey; use p2p_repo::utils::generate_keypair; use std::net::IpAddr; use std::str::FromStr; @@ -23,8 +25,16 @@ pub async fn greet(name: &str) { log!("I say: {}", name); let mut random_buf = [0u8; 32]; getrandom::getrandom(&mut random_buf).unwrap(); + //spawn_and_log_error(testt("ws://127.0.0.1:3012")); async fn method() -> ResultSend<()> { + let pubkey_null: [u8; 32] = [ + 59, 106, 39, 188, 206, 182, 164, 45, 98, 163, 168, 208, 42, 111, 13, 115, 101, 50, 21, + 119, 29, 226, 67, 166, 58, 192, 72, 161, 139, 89, 218, 41, + ]; + + let server_key = PubKey::Ed25519PubKey(pubkey_null); + log!("start connecting"); //let cnx = Arc::new(); let (priv_key, pub_key) = generate_keypair(); @@ -37,7 +47,7 @@ pub async fn greet(name: &str) { None, priv_key, pub_key, - pub_key, + server_key, ) .await; log!("broker.connect : {:?}", res); diff --git a/ngd/Cargo.toml b/ngd/Cargo.toml index 88b2842..cad8912 100644 --- a/ngd/Cargo.toml +++ b/ngd/Cargo.toml @@ -10,4 +10,5 @@ repository = "https://git.nextgraph.org/NextGraph/nextgraph-rs" [dependencies] p2p-broker = { path = "../p2p-broker" } p2p-net = { path = "../p2p-net" } +p2p-repo = { path = "../p2p-repo" } async-std = { version = "1.12.0", features = ["attributes"] } diff --git a/ngd/src/main.rs b/ngd/src/main.rs index cd76ac4..4b8f56d 100644 --- a/ngd/src/main.rs +++ b/ngd/src/main.rs @@ -9,12 +9,26 @@ use p2p_broker::server_ws::run_server; use p2p_net::WS_PORT; +use p2p_repo::{ + types::{PrivKey, PubKey}, + utils::generate_keypair, +}; #[async_std::main] async fn main() -> std::io::Result<()> { println!("Starting NextGraph daemon..."); - - run_server(format!("127.0.0.1:{}", WS_PORT).as_str()).await?; + //let keys = generate_keypair(); + //println!("Public key of node: {:?}", keys.1); + //println!("Private key of node: {:?}", keys.0); + let pubkey = PubKey::Ed25519PubKey([ + 158, 209, 118, 156, 133, 101, 241, 72, 91, 80, 160, 184, 201, 66, 245, 2, 91, 16, 10, 143, + 50, 206, 222, 187, 24, 122, 51, 59, 214, 132, 169, 154, + ]); + let privkey = PrivKey::Ed25519PrivKey([ + 254, 127, 162, 204, 53, 25, 141, 12, 4, 118, 23, 42, 52, 246, 37, 52, 76, 11, 176, 219, 31, + 241, 25, 73, 199, 118, 209, 85, 159, 234, 31, 206, + ]); + run_server(format!("127.0.0.1:{}", WS_PORT).as_str(), privkey, pubkey).await?; Ok(()) } diff --git a/p2p-broker/Cargo.toml b/p2p-broker/Cargo.toml index 9b1f3b7..94923fb 100644 --- a/p2p-broker/Cargo.toml +++ b/p2p-broker/Cargo.toml @@ -11,6 +11,7 @@ repository = "https://git.nextgraph.org/NextGraph/nextgraph-rs" debug_print = "1.0.0" p2p-repo = { path = "../p2p-repo" } p2p-net = { path = "../p2p-net" } +p2p-client-ws = { path = "../p2p-client-ws" } p2p-stores-lmdb = { path = "../p2p-stores-lmdb" } chacha20 = "0.9.0" serde = { version = "1.0", features = ["derive"] } diff --git a/p2p-broker/src/server_ws.rs b/p2p-broker/src/server_ws.rs index 7f39aa3..a88ee9c 100644 --- a/p2p-broker/src/server_ws.rs +++ b/p2p-broker/src/server_ws.rs @@ -20,6 +20,12 @@ use async_tungstenite::accept_async; use async_tungstenite::tungstenite::protocol::Message; use debug_print::*; use futures::{SinkExt, StreamExt}; +use p2p_client_ws::remote_ws::ConnectionWebSocket; +use p2p_net::broker::*; +use p2p_net::connection::IAccept; +use p2p_net::types::IP; +use p2p_repo::types::{PrivKey, PubKey}; +use p2p_repo::utils::generate_keypair; use p2p_stores_lmdb::broker_store::LmdbBrokerStore; use p2p_stores_lmdb::repo_store::LmdbRepoStore; use std::fs; @@ -131,7 +137,7 @@ pub async fn run_server_accept_one(addrs: &str) -> std::io::Result<()> { BrokerServer::new(store, ConfigMode::Local).expect("starting broker"); let socket = TcpListener::bind(addrs).await?; - debug_println!("Listening on 127.0.0.1:3012"); + debug_println!("Listening on {}", addrs); let mut connections = socket.incoming(); let server_arc = Arc::new(server); let tcp = connections.next().await.unwrap()?; @@ -141,7 +147,11 @@ pub async fn run_server_accept_one(addrs: &str) -> std::io::Result<()> { Ok(()) } -pub async fn run_server(addrs: &str) -> std::io::Result<()> { +pub async fn run_server( + addrs: &str, + peer_priv_key: PrivKey, + peer_pub_key: PubKey, +) -> std::io::Result<()> { let root = tempfile::Builder::new() .prefix("node-daemon") .tempdir() @@ -155,11 +165,27 @@ pub async fn run_server(addrs: &str) -> std::io::Result<()> { BrokerServer::new(store, ConfigMode::Local).expect("starting broker"); let socket = TcpListener::bind(addrs).await?; + debug_println!("Listening on {}", addrs); let mut connections = socket.incoming(); let server_arc = Arc::new(server); while let Some(tcp) = connections.next().await { - let proto_handler = Arc::clone(&server_arc).protocol_handler(); - let _handle = task::spawn(connection_loop(tcp.unwrap(), proto_handler)); + let tcp = tcp.unwrap(); + let sock_addr = tcp.peer_addr().unwrap(); + let ip = sock_addr.ip(); + let mut ws = accept_async(tcp).await.unwrap(); + + let cws = ConnectionWebSocket {}; + let base = cws.accept(peer_priv_key, peer_pub_key, ws).await.unwrap(); + + //TODO FIXME get remote_peer_id from ConnectionBase (once it is available) + let (priv_key, pub_key) = generate_keypair(); + let remote_peer_id = pub_key; + + let res = BROKER + .write() + .await + .accept(base, IP::try_from(&ip).unwrap(), None, remote_peer_id) + .await; } Ok(()) } diff --git a/p2p-client-ws/src/remote_ws.rs b/p2p-client-ws/src/remote_ws.rs index e03fd81..2ff495a 100644 --- a/p2p-client-ws/src/remote_ws.rs +++ b/p2p-client-ws/src/remote_ws.rs @@ -40,12 +40,12 @@ pub struct ConnectionWebSocket {} #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] -impl IConnection for ConnectionWebSocket { +impl IConnect for ConnectionWebSocket { async fn open( &self, ip: IP, - peer_pubk: PrivKey, - peer_privk: PubKey, + peer_privk: PrivKey, + peer_pubk: PubKey, remote_peer: DirectPeerId, ) -> Result { let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS); @@ -105,7 +105,7 @@ impl IConnection for ConnectionWebSocket { // })) // .await; - cnx.start_read_loop(); + cnx.start_read_loop(peer_privk, Some(remote_peer)); let s = cnx.take_sender(); let r = cnx.take_receiver(); let mut shutdown = cnx.set_shutdown(); @@ -125,6 +125,8 @@ impl IConnection for ConnectionWebSocket { debug_println!("END of WS loop"); }); + cnx.start().await; + //spawn_and_log_error(ws_loop(ws, cnx.take_sender(), cnx.take_receiver())); // @@ -159,11 +161,36 @@ impl IConnection for ConnectionWebSocket { } } } +} + +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +impl IAccept for ConnectionWebSocket { + type Socket = WebSocketStream; + async fn accept( + &self, + peer_privk: PrivKey, + peer_pubk: PubKey, + socket: Self::Socket, + ) -> Result { + let mut cnx = ConnectionBase::new(ConnectionDir::Server, TransportProtocol::WS); - async fn accept(&self) -> Result { - let cnx = ConnectionBase::new(ConnectionDir::Server, TransportProtocol::WS); + cnx.start_read_loop(peer_privk, None); + let s = cnx.take_sender(); + let r = cnx.take_receiver(); + let mut shutdown = cnx.set_shutdown(); - unimplemented!(); + let join = task::spawn(async move { + debug_println!("START of WS loop"); + + let res = ws_loop(socket, s, r).await; + + if res.is_err() { + let _ = shutdown.send(res.err().unwrap()).await; + } + debug_println!("END of WS loop"); + }); + Ok(cnx) } } @@ -237,7 +264,7 @@ async fn ws_loop( let _ = futures::SinkExt::send(receiver, ConnectionCommand::Close).await; log!("CLOSE from server"); } - + return Ok(ProtocolError::Closing); } else { futures::SinkExt::send(receiver,ConnectionCommand::Msg(serde_bare::from_slice::(&msg.into_data())?)).await .map_err(|_e| NetError::IoError)?; @@ -252,9 +279,7 @@ async fn ws_loop( log!("SENDING MESSAGE {:?}", msg); match msg { ConnectionCommand::Msg(m) => { - if let ProtocolMessage::Start(s) = m { - futures::SinkExt::send(&mut stream, Message::binary(serde_bare::to_vec(&s)?)).await.map_err(|_e| NetError::IoError)?; - } + futures::SinkExt::send(&mut stream,Message::binary(serde_bare::to_vec(&m)?)).await.map_err(|_e| NetError::IoError)?; }, ConnectionCommand::Error(e) => { return Err(e); @@ -277,7 +302,8 @@ async fn ws_loop( Ok(proto_err) => { if proto_err == ProtocolError::Closing { //FIXME: remove this case - ws.close(None).await.map_err(|_e| NetError::WsError)?; + log!("ProtocolError::Closing"); + let _ = ws.close(None).await; } else if proto_err == ProtocolError::NoError { close_ws(&mut ws, &mut receiver, 1000, "").await?; } else { @@ -322,7 +348,11 @@ mod test { pub async fn test_ws() -> Result<(), NetError> { let mut random_buf = [0u8; 32]; getrandom::getrandom(&mut random_buf).unwrap(); - //spawn_and_log_error(testt("ws://127.0.0.1:3012")); + + let server_key = PubKey::Ed25519PubKey([ + 158, 209, 118, 156, 133, 101, 241, 72, 91, 80, 160, 184, 201, 66, 245, 2, 91, 16, 10, + 143, 50, 206, 222, 187, 24, 122, 51, 59, 214, 132, 169, 154, + ]); log!("start connecting"); let (priv_key, pub_key) = generate_keypair(); @@ -336,7 +366,7 @@ mod test { None, priv_key, pub_key, - pub_key, + server_key, ) .await; log!("broker.connect : {:?}", res); diff --git a/p2p-client-ws/src/remote_ws_wasm.rs b/p2p-client-ws/src/remote_ws_wasm.rs index 01a80ce..42664b8 100644 --- a/p2p-client-ws/src/remote_ws_wasm.rs +++ b/p2p-client-ws/src/remote_ws_wasm.rs @@ -33,7 +33,7 @@ pub struct ConnectionWebSocket {} #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] -impl IConnection for ConnectionWebSocket { +impl IConnect for ConnectionWebSocket { async fn open( &self, ip: IP, @@ -54,7 +54,7 @@ impl IConnection for ConnectionWebSocket { //let (mut sender_tx, sender_rx) = mpsc::unbounded(); //let (mut receiver_tx, receiver_rx) = mpsc::unbounded(); - cnx.start_read_loop(); + cnx.start_read_loop(peer_pubk, Some(remote_peer)); let mut shutdown = cnx.set_shutdown(); spawn_and_log_error(ws_loop( @@ -65,6 +65,8 @@ impl IConnection for ConnectionWebSocket { shutdown, )); + cnx.start().await; + //spawn_and_log_error(read_loop(receiver_rx, sender_tx.clone())); log!("sending..."); @@ -95,10 +97,6 @@ impl IConnection for ConnectionWebSocket { //Ok(cnx) Ok(cnx) } - - async fn accept(&self) -> Result { - !unimplemented!() - } } async fn ws_loop( @@ -135,9 +133,9 @@ async fn ws_loop( log!("SENDING MESSAGE {:?}", msg); match msg { ConnectionCommand::Msg(m) => { - if let ProtocolMessage::Start(s) = m { - stream.send(WsMessage::Binary(serde_bare::to_vec(&s)?)).await.map_err(|_e| NetError::IoError)?; - } + + stream.send(WsMessage::Binary(serde_bare::to_vec(&m)?)).await.map_err(|_e| NetError::IoError)?; + }, ConnectionCommand::Error(e) => { return Err(e); diff --git a/p2p-net/Cargo.toml b/p2p-net/Cargo.toml index efdc469..95f94d3 100644 --- a/p2p-net/Cargo.toml +++ b/p2p-net/Cargo.toml @@ -26,4 +26,11 @@ noise-protocol = "0.1.4" noise-rust-crypto = "0.5.0" [target.'cfg(target_arch = "wasm32")'.dependencies] -gloo-timers = "0.2.6" \ No newline at end of file +gloo-timers = "0.2.6" + +[target.'cfg(target_arch = "wasm32")'.dependencies.getrandom] +version = "0.2.7" +features = ["js"] + +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +getrandom = "0.2.7" \ No newline at end of file diff --git a/p2p-net/src/actors/noise.rs b/p2p-net/src/actors/noise.rs index 205204c..0c97f18 100644 --- a/p2p-net/src/actors/noise.rs +++ b/p2p-net/src/actors/noise.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use crate::{actor::*, connection::NoiseFSM, errors::ProtocolError, types::ProtocolMessage}; use async_std::sync::Mutex; use serde::{Deserialize, Serialize}; +use std::any::{Any, TypeId}; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct NoiseV0 { @@ -15,6 +16,14 @@ pub enum Noise { V0(NoiseV0), } +impl Noise { + pub fn data(&self) -> &[u8] { + match self { + Noise::V0(v0) => v0.data.as_slice(), + } + } +} + // impl BrokerRequest for Noise { // fn send(&self) -> ProtocolMessage { // ProtocolMessage::Noise(self.clone()) diff --git a/p2p-net/src/actors/start.rs b/p2p-net/src/actors/start.rs index e120621..fc59914 100644 --- a/p2p-net/src/actors/start.rs +++ b/p2p-net/src/actors/start.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; use std::any::{Any, TypeId}; use std::sync::Arc; -pub struct Noise3(Noise); +// pub struct Noise3(Noise); /// Start chosen protocol /// First message sent by the client diff --git a/p2p-net/src/broker.rs b/p2p-net/src/broker.rs index f5cb582..13d7885 100644 --- a/p2p-net/src/broker.rs +++ b/p2p-net/src/broker.rs @@ -158,13 +158,62 @@ impl Broker { let _ = self.shutdown_sender.send(ProtocolError::Closing).await; } + pub async fn accept( + &mut self, + mut connection: ConnectionBase, + ip: IP, + core: Option, + remote_peer_id: DirectPeerId, + ) -> Result<(), NetError> { + if self.closing { + return Err(NetError::Closing); + } + + let join = connection.take_shutdown(); + + let connected = if core.is_some() { + let dc = DirectConnection { + ip, + interface: core.clone().unwrap(), + remote_peer_id, + tp: connection.transport_protocol(), + cnx: connection, + }; + self.direct_connections.insert(ip, dc); + PeerConnection::Core(ip) + } else { + PeerConnection::Client(connection) + }; + let bpi = BrokerPeerInfo { + lastPeerAdvert: None, + connected, + }; + self.peers.insert(remote_peer_id, bpi); + + async fn watch_close( + mut join: Receiver, + remote_peer_id: DirectPeerId, + ) -> ResultSend<()> { + async move { + let res = join.next().await; + log!("SOCKET IS CLOSED {:?} {:?}", res, &remote_peer_id); + log!("REMOVED"); + BROKER.write().await.remove(&remote_peer_id); + } + .await; + Ok(()) + } + spawn_and_log_error(watch_close(join, remote_peer_id)); + Ok(()) + } + pub async fn connect( &mut self, - cnx: Box, + cnx: Box, ip: IP, core: Option, // the interface used as egress for this connection - peer_pubk: PrivKey, - peer_privk: PubKey, + peer_privk: PrivKey, + peer_pubk: PubKey, remote_peer_id: DirectPeerId, ) -> Result<(), NetError> { if self.closing { @@ -175,9 +224,9 @@ impl Broker { //IpAddr::from_str("127.0.0.1"); //cnx.open(url, peer_pubk, peer_privk).await?; //let cnx = Arc::new(); - let (priv_key, pub_key) = generate_keypair(); + //let (priv_key, pub_key) = generate_keypair(); log!("CONNECTING"); - let mut connection = cnx.open(ip, priv_key, pub_key, remote_peer_id).await?; + let mut connection = cnx.open(ip, peer_privk, peer_pubk, remote_peer_id).await?; let join = connection.take_shutdown(); @@ -202,11 +251,11 @@ impl Broker { async fn watch_close( mut join: Receiver, - cnx: Box, + cnx: Box, ip: IP, core: Option, // the interface used as egress for this connection - peer_pubk: PrivKey, - peer_privk: PubKey, + peer_privk: PrivKey, + peer_pubkey: PubKey, remote_peer_id: DirectPeerId, ) -> ResultSend<()> { async move { @@ -235,8 +284,8 @@ impl Broker { cnx, ip, core, - peer_pubk, peer_privk, + peer_pubk, remote_peer_id, )); Ok(()) diff --git a/p2p-net/src/connection.rs b/p2p-net/src/connection.rs index 27ce8f1..1a02147 100644 --- a/p2p-net/src/connection.rs +++ b/p2p-net/src/connection.rs @@ -13,10 +13,14 @@ use crate::types::*; use crate::utils::*; use async_std::stream::StreamExt; use async_std::sync::Mutex; +use debug_print::debug_println; use futures::{channel::mpsc, select, Future, FutureExt, SinkExt}; -use noise_protocol::{CipherState, HandshakeState}; +use noise_protocol::U8Array; +use noise_protocol::{patterns::noise_xk, CipherState, HandshakeState}; +use noise_rust_crypto::sensitive::Sensitive; use noise_rust_crypto::*; use p2p_repo::types::{PrivKey, PubKey}; +use serde_bare::from_slice; use unique_id::sequence::SequenceGenerator; use unique_id::Generator; use unique_id::GeneratorFromSeed; @@ -31,15 +35,26 @@ pub enum ConnectionCommand { #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] -pub trait IConnection: Send + Sync { +pub trait IConnect: Send + Sync { async fn open( &self, ip: IP, - peer_pubk: PrivKey, - peer_privk: PubKey, + peer_privk: PrivKey, + peer_pubk: PubKey, remote_peer: DirectPeerId, ) -> Result; - async fn accept(&self) -> Result; +} + +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +pub trait IAccept: Send + Sync { + type Socket; + async fn accept( + &self, + peer_privk: PrivKey, + peer_pubk: PubKey, + socket: Self::Socket, + ) -> Result; } #[derive(PartialEq, Debug, Clone)] @@ -77,7 +92,11 @@ pub struct NoiseFSM { actors: Arc>>>, noise_handshake_state: Option>, - noise_cipher_state: Option>, + noise_cipher_state_enc: Option>, + noise_cipher_state_dec: Option>, + + from: PrivKey, + to: Option, } impl fmt::Debug for NoiseFSM { @@ -101,6 +120,8 @@ impl NoiseFSM { dir: ConnectionDir, actors: Arc>>>, sender: Sender, + from: PrivKey, + to: Option, ) -> Self { Self { state: if tp == TransportProtocol::Local { @@ -112,16 +133,34 @@ impl NoiseFSM { actors, sender, noise_handshake_state: None, - noise_cipher_state: None, + noise_cipher_state_enc: None, + noise_cipher_state_dec: None, + from, + to, } } - fn decrypt(&mut self, ciphertext: &Noise) -> ProtocolMessage { - unimplemented!(); + fn decrypt(&mut self, ciphertext: &Noise) -> Result { + let ser = self + .noise_cipher_state_dec + .as_mut() + .unwrap() + .decrypt_vec(ciphertext.data()) + .map_err(|e| ProtocolError::DecryptionError)?; + + Ok(from_slice::(&ser)?) } - fn encrypt(&mut self, plaintext: ProtocolMessage) -> Noise { - unimplemented!(); + fn encrypt(&mut self, plaintext: ProtocolMessage) -> Result { + let ser = serde_bare::to_vec(&plaintext)?; + + let cipher = self + .noise_cipher_state_enc + .as_mut() + .unwrap() + .encrypt_vec(&ser); + + Ok(Noise::V0(NoiseV0 { data: cipher })) } pub async fn remove_actor(&self, id: i64) { @@ -129,8 +168,8 @@ impl NoiseFSM { } pub async fn send(&mut self, msg: ProtocolMessage) -> Result<(), ProtocolError> { - if self.state == FSMstate::AuthResult && self.noise_cipher_state.is_some() { - let cipher = self.encrypt(msg); + if self.state == FSMstate::AuthResult && self.noise_cipher_state_enc.is_some() { + let cipher = self.encrypt(msg)?; self.sender .send(ConnectionCommand::Msg(ProtocolMessage::Noise(cipher))) .await; @@ -156,13 +195,13 @@ impl NoiseFSM { // } // } - pub fn step( + pub async fn step( &mut self, mut msg_opt: Option, ) -> Result { - if self.noise_cipher_state.is_some() { + if self.noise_cipher_state_dec.is_some() { if let Some(ProtocolMessage::Noise(noise)) = msg_opt.as_ref() { - let new = self.decrypt(noise); + let new = self.decrypt(noise)?; msg_opt.replace(new); } else { return Err(ProtocolError::MustBeEncrypted); @@ -171,22 +210,157 @@ impl NoiseFSM { match self.state { // TODO verify that ID is zero FSMstate::Local0 => { - if let Some(msg) = msg_opt.as_ref() { + // CLIENT LOCAL + if !self.dir.is_server() && msg_opt.is_none() { + self.state = FSMstate::ClientHello; + Box::new(Actor::::new(0, true)); + return Ok(StepReply::NONE); + } + // SERVER LOCAL + else if let Some(msg) = msg_opt.as_ref() { if self.dir.is_server() && msg.type_id() == ClientHello::Local.type_id() { self.state = FSMstate::ServerHello; Box::new(Actor::::new(msg.id(), false)); return Ok(StepReply::NONE); } - } else if !self.dir.is_server() && msg_opt.is_none() { - self.state = FSMstate::ClientHello; - Box::new(Actor::::new(0, true)); + } + } + FSMstate::Noise0 => { + // CLIENT INITIALIZE NOISE + if !self.dir.is_server() && msg_opt.is_none() { + let mut handshake = HandshakeState::::new( + noise_xk(), + true, + &[], + Some(Sensitive::from_slice(self.from.slice())), + None, + Some(*self.to.unwrap().slice()), + None, + ); + + let payload = handshake + .write_message_vec(&[]) + .map_err(|e| ProtocolError::NoiseHandshakeFailed)?; + + let noise = Noise::V0(NoiseV0 { data: payload }); + self.sender.send(ConnectionCommand::Msg(noise.into())).await; + + self.noise_handshake_state = Some(handshake); + + self.state = FSMstate::Noise1; + return Ok(StepReply::NONE); } + // SERVER INITIALIZE NOISE + else if let Some(msg) = msg_opt.as_ref() { + if self.dir.is_server() { + if let ProtocolMessage::Noise(noise) = msg { + let mut handshake = + HandshakeState::::new( + noise_xk(), + false, + &[], + Some(Sensitive::from_slice(self.from.slice())), + None, + None, + None, + ); + + let payload = + handshake.read_message_vec(noise.data()).map_err(|e| { + debug_println!("{:?}", e); + ProtocolError::NoiseHandshakeFailed + })?; + + let noise = Noise::V0(NoiseV0 { data: payload }); + self.sender.send(ConnectionCommand::Msg(noise.into())).await; + + self.noise_handshake_state = Some(handshake); + + self.state = FSMstate::Noise2; + + return Ok(StepReply::NONE); + } + } + } + } + FSMstate::Noise1 => { + // CLIENT second round NOISE + if let Some(msg) = msg_opt.as_ref() { + if !self.dir.is_server() { + if let ProtocolMessage::Noise(noise) = msg { + let handshake = self.noise_handshake_state.as_mut().unwrap(); + + let payload = handshake + .read_message_vec(noise.data()) + .map_err(|e| ProtocolError::NoiseHandshakeFailed)?; + + if !handshake.completed() { + return Err(ProtocolError::NoiseHandshakeFailed); + } + + let noise3 = ClientHello::Noise3(Noise::V0(NoiseV0 { data: payload })); + self.sender + .send(ConnectionCommand::Msg(noise3.into())) + .await; + + let ciphers = handshake.get_ciphers(); + self.noise_cipher_state_enc = Some(ciphers.0); + self.noise_cipher_state_dec = Some(ciphers.1); + + self.noise_handshake_state = None; + + self.state = FSMstate::ClientHello; + + return Ok(StepReply::NONE); + } + } + } + } + FSMstate::Noise2 => { + // SERVER second round NOISE + if let Some(msg) = msg_opt.as_ref() { + if self.dir.is_server() { + if let ProtocolMessage::Start(StartProtocol::Client(ClientHello::Noise3( + noise, + ))) = msg + { + let handshake = self.noise_handshake_state.as_mut().unwrap(); + + let _ = handshake + .read_message_vec(noise.data()) + .map_err(|e| ProtocolError::NoiseHandshakeFailed)?; + + if !handshake.completed() { + return Err(ProtocolError::NoiseHandshakeFailed); + } + + self.to = Some(PubKey::Ed25519PubKey(handshake.get_rs().unwrap())); + + let ciphers = handshake.get_ciphers(); + self.noise_cipher_state_enc = Some(ciphers.1); + self.noise_cipher_state_dec = Some(ciphers.0); + + self.noise_handshake_state = None; + + let mut nonce_buf = [0u8; 32]; + getrandom::getrandom(&mut nonce_buf).unwrap(); + + let server_hello = ServerHello::V0(ServerHelloV0 { + nonce: nonce_buf.to_vec(), + }); + self.sender + .send(ConnectionCommand::Msg(server_hello.into())) + .await; + + self.state = FSMstate::ServerHello; + + return Ok(StepReply::NONE); + } + } + } } - FSMstate::Noise0 => {} - FSMstate::Noise1 => {} - FSMstate::Noise2 => {} - FSMstate::Noise3 => {} //set noise_handshake_state to none + FSMstate::Noise3 => {} FSMstate::ExtRequest => {} FSMstate::ExtResponse => {} FSMstate::ClientHello => {} @@ -284,64 +458,68 @@ impl ConnectionBase { ) -> ResultSend<()> { while let Some(msg) = receiver.next().await { log!("RECEIVED: {:?}", msg); - - if let ConnectionCommand::Close = msg { - log!("EXIT READ LOOP"); - break; - } else if let ConnectionCommand::Msg(proto_msg) = msg { - let res; - { - let mut locked_fsm = fsm.lock().await; - res = locked_fsm.step(Some(proto_msg)); + match msg { + ConnectionCommand::Close + | ConnectionCommand::Error(_) + | ConnectionCommand::ProtocolError(_) => { + log!("EXIT READ LOOP"); + break; } - match res { - Err(e) => { - if sender - .send(ConnectionCommand::ProtocolError(e)) - .await - .is_err() - { - break; //TODO test that sending a ProtocolError effectively closes the connection (with ConnectionCommand::Close) - } + ConnectionCommand::Msg(proto_msg) => { + let res; + { + let mut locked_fsm = fsm.lock().await; + res = locked_fsm.step(Some(proto_msg)).await; } - Ok(StepReply::NONE) => {} - Ok(StepReply::Responder(responder)) => { - let r = responder - .get_actor() - .respond(responder, Arc::clone(&fsm)) - .await; - if r.is_err() { + match res { + Err(e) => { if sender - .send(ConnectionCommand::ProtocolError(r.unwrap_err())) + .send(ConnectionCommand::ProtocolError(e)) .await .is_err() { - break; + break; //TODO test that sending a ProtocolError effectively closes the connection (with ConnectionCommand::Close) } } - } - Ok(StepReply::Response(response)) => { - let mut lock = actors.lock().await; - let exists = lock.get_mut(&response.id()); - match exists { - Some(actor_sender) => { - if actor_sender - .send(ConnectionCommand::Msg(response)) + Ok(StepReply::NONE) => {} + Ok(StepReply::Responder(responder)) => { + let r = responder + .get_actor() + .respond(responder, Arc::clone(&fsm)) + .await; + if r.is_err() { + if sender + .send(ConnectionCommand::ProtocolError(r.unwrap_err())) .await .is_err() { break; } } - None => { - if sender - .send(ConnectionCommand::ProtocolError( - ProtocolError::ActorError, - )) - .await - .is_err() - { - break; + } + Ok(StepReply::Response(response)) => { + let mut lock = actors.lock().await; + let exists = lock.get_mut(&response.id()); + match exists { + Some(actor_sender) => { + if actor_sender + .send(ConnectionCommand::Msg(response)) + .await + .is_err() + { + break; + } + } + None => { + if sender + .send(ConnectionCommand::ProtocolError( + ProtocolError::ActorError, + )) + .await + .is_err() + { + break; + } } } } @@ -396,7 +574,19 @@ impl ConnectionBase { self.send(ConnectionCommand::Close).await; } - pub fn start_read_loop(&mut self) { + pub async fn start(&mut self) { + // BOOTSTRAP the protocol + if !self.dir.is_server() { + let res; + let fsm = self.fsm.as_ref().unwrap(); + res = fsm.lock().await.step(None).await; + if let Err(err) = res { + self.send(ConnectionCommand::ProtocolError(err)).await; + } + } + } + + pub fn start_read_loop(&mut self, from: PrivKey, to: Option) { let (sender_tx, sender_rx) = mpsc::unbounded(); let (receiver_tx, receiver_rx) = mpsc::unbounded(); self.sender = Some(sender_rx); @@ -409,6 +599,8 @@ impl ConnectionBase { self.dir.clone(), Arc::clone(&self.actors), sender_tx.clone(), + from, + to, ))); self.fsm = Some(Arc::clone(&fsm)); diff --git a/p2p-net/src/errors.rs b/p2p-net/src/errors.rs index aa41374..edebbb3 100644 --- a/p2p-net/src/errors.rs +++ b/p2p-net/src/errors.rs @@ -74,6 +74,9 @@ pub enum ProtocolError { Closing, FsmNotReady, MustBeEncrypted, + NoiseHandshakeFailed, + DecryptionError, + EncryptionError, } //MAX 949 ProtocolErrors impl ProtocolError { diff --git a/p2p-repo/src/types.rs b/p2p-repo/src/types.rs index d644f2c..bf28882 100644 --- a/p2p-repo/src/types.rs +++ b/p2p-repo/src/types.rs @@ -3,7 +3,7 @@ // This code is partly derived from work written by TG x Thoth from P2Pcollab. // Copyright 2022 TG x Thoth // Licensed under the Apache License, Version 2.0 -// +// // or the MIT license , // at your option. All files in the project carrying such // notice may not be copied, modified, or distributed except @@ -91,6 +91,14 @@ pub enum PrivKey { Ed25519PrivKey(Ed25519PrivKey), } +impl PrivKey { + pub fn slice(&self) -> &[u8; 32] { + match self { + PrivKey::Ed25519PrivKey(o) => o, + } + } +} + /// Ed25519 signature pub type Ed25519Sig = [[u8; 32]; 2]; @@ -141,17 +149,32 @@ pub type BloomFilter1K = [[u8; 32]; 32]; /// List of Permissions pub enum PermissionType { - ADD_BRANCH, REMOVE_BRANCH, CHANGE_NAME, - ADD_MEMBER, REMOVE_MEMBER, CHANGE_PERMISSION, - TRANSACTION, SNAPSHOT, SHARING, CHANGE_ACK_CONFIG, + ADD_BRANCH, + REMOVE_BRANCH, + CHANGE_NAME, + ADD_MEMBER, + REMOVE_MEMBER, + CHANGE_PERMISSION, + TRANSACTION, + SNAPSHOT, + SHARING, + CHANGE_ACK_CONFIG, } /// List of Identity types pub enum Identity { - ORG_SITE(PubKey), PERSO_SITE(PubKey), - ORG_PUBLIC(PubKey), ORG_PROTECTED(PubKey), ORG_PRIVATE(PubKey), - PERSO_PUBLIC(PubKey), PERSO_PROTECTED(PubKey), PERSO_PRIVATE(PubKey), - GROUP(RepoId), DIALOG(RepoId), DOCUMENT(RepoId), DIALOG_OVERLAY(Digest), + ORG_SITE(PubKey), + PERSO_SITE(PubKey), + ORG_PUBLIC(PubKey), + ORG_PROTECTED(PubKey), + ORG_PRIVATE(PubKey), + PERSO_PUBLIC(PubKey), + PERSO_PROTECTED(PubKey), + PERSO_PRIVATE(PubKey), + GROUP(RepoId), + DIALOG(RepoId), + DOCUMENT(RepoId), + DIALOG_OVERLAY(Digest), } /// RepoHash: diff --git a/p2p-repo/src/utils.rs b/p2p-repo/src/utils.rs index 085176d..64539c3 100644 --- a/p2p-repo/src/utils.rs +++ b/p2p-repo/src/utils.rs @@ -3,13 +3,12 @@ // This code is partly derived from work written by TG x Thoth from P2Pcollab. // Copyright 2022 TG x Thoth // Licensed under the Apache License, Version 2.0 -// +// // or the MIT license , // at your option. All files in the project carrying such // notice may not be copied, modified, or distributed except // according to those terms. - use crate::errors::*; use crate::types::*; @@ -17,6 +16,33 @@ use ed25519_dalek::*; use rand::rngs::OsRng; use std::time::{SystemTime, UNIX_EPOCH}; +pub fn generate_null_keypair() -> (PrivKey, PubKey) { + let master_key: [u8; 32] = [0; 32]; + let sk = SecretKey::from_bytes(&master_key).unwrap(); + let pk: PublicKey = (&sk).into(); + + let keypair = Keypair { + public: pk, + secret: sk, + }; + + // println!( + // "private key: ({}) {:?}", + // keypair.secret.as_bytes().len(), + // keypair.secret.as_bytes() + // ); + // println!( + // "public key: ({}) {:?}", + // keypair.public.as_bytes().len(), + // keypair.public.as_bytes() + // ); + let ed_priv_key = keypair.secret.to_bytes(); + let ed_pub_key = keypair.public.to_bytes(); + let priv_key = PrivKey::Ed25519PrivKey(ed_priv_key); + let pub_key = PubKey::Ed25519PubKey(ed_pub_key); + (priv_key, pub_key) +} + pub fn sign( author_privkey: PrivKey, author_pubkey: PubKey,