/* * Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers * All rights reserved. * Licensed under the Apache License, Version 2.0 * * or the MIT license , * at your option. All files in the project carrying such * notice may not be copied, modified, or distributed except * according to those terms. */ use async_std::task; use async_std::sync::Mutex; use futures::{ ready, stream::Stream, task::{Context, Poll}, Future, select, FutureExt, }; use futures::channel::mpsc; use std::pin::Pin; use std::{collections::HashSet, fmt::Debug}; use async_oneshot::oneshot; use debug_print::*; use futures::{pin_mut, stream, Sink, SinkExt, StreamExt}; use p2p_repo::object::*; use p2p_repo::store::*; use p2p_repo::types::*; use p2p_repo::utils::*; use p2p_net::errors::*; use p2p_net::types::*; use p2p_net::broker_connection::*; use std::collections::HashMap; use std::sync::{Arc, RwLock}; use xactor::{message, spawn, Actor, Addr, Handler, WeakAddr}; #[message] struct BrokerMessageXActor(BrokerMessage); struct BrokerMessageActor { r: Option>, s: async_oneshot::Sender, } impl Actor for BrokerMessageActor {} impl BrokerMessageActor { fn new() -> BrokerMessageActor { let (s, r) = oneshot::(); BrokerMessageActor { r: Some(r), s } } fn resolve(&mut self, msg: BrokerMessage) { let _ = self.s.send(msg); } fn receiver(&mut self) -> async_oneshot::Receiver { self.r.take().unwrap() } } struct BrokerMessageStreamActor { r: Option>, s: async_channel::Sender, error_r: Option>>, error_s: Option>>, } impl Actor for BrokerMessageStreamActor {} impl BrokerMessageStreamActor { fn new() -> BrokerMessageStreamActor { let (s, r) = async_channel::unbounded::(); let (error_s, error_r) = oneshot::>(); BrokerMessageStreamActor { r: Some(r), s, error_r: Some(error_r), error_s: Some(error_s), } } async fn partial(&mut self, block: Block) -> Result<(), ProtocolError> { //debug_println!("GOT PARTIAL {:?}", block.id()); self.s .send(block) .await .map_err(|e| ProtocolError::WriteError) } fn receiver(&mut self) -> async_channel::Receiver { self.r.take().unwrap() } fn error_receiver(&mut self) -> async_oneshot::Receiver> { self.error_r.take().unwrap() } fn send_error(&mut self, err: Option) { if self.error_s.is_some() { let _ = self.error_s.take().unwrap().send(err); self.error_s = None; } } fn close(&mut self) { self.s.close(); } } #[async_trait::async_trait] impl Handler for BrokerMessageActor { async fn handle(&mut self, ctx: &mut xactor::Context, msg: BrokerMessageXActor) { //println!("handling {:?}", msg.0); self.resolve(msg.0); ctx.stop(None); } } #[async_trait::async_trait] impl Handler for BrokerMessageStreamActor { async fn handle(&mut self, ctx: &mut xactor::Context, msg: BrokerMessageXActor) { //println!("handling {:?}", msg.0); let res: Result, ProtocolError> = msg.0.into(); match res { Err(e) => { self.send_error(Some(e)); ctx.stop(None); self.close(); } Ok(Some(b)) => { self.send_error(None); // it must be a partial content let res = self.partial(b).await; if let Err(e) = res { ctx.stop(None); self.close(); } } Ok(None) => { self.send_error(None); ctx.stop(None); self.close(); } } } } pub struct ConnectionRemote {} impl ConnectionRemote { pub async fn ext_request< B: Stream> + StreamExt + Send + Sync, A: Sink, Error = ProtocolError> + Send, >( w: A, r: B, request: ExtRequest, ) -> Result { unimplemented!(); } async fn close(w: S, err: ProtocolError) -> ProtocolError where S: Sink, Error = ProtocolError>, { let mut writer = Box::pin(w); let _ = writer.send(vec![]); let _ = writer.close().await; err } pub async fn open_broker_connection< B: Stream> + StreamExt + Send + Sync + 'static, A: Sink, Error = ProtocolError> + Send + 'static, >( w: A, r: B, user: PubKey, user_pk: PrivKey, client: PubKey, ) -> Result { let mut writer = Box::pin(w); writer .send(serde_bare::to_vec(&StartProtocol::Auth(ClientHello::V0()))?) .await .map_err(|_e| ProtocolError::WriteError)?; let mut reader = Box::pin(r); let answer = reader.next().await; if answer.is_none() { return Err(Self::close(writer, ProtocolError::InvalidState).await); } let server_hello = serde_bare::from_slice::(&answer.unwrap())?; //debug_println!("received nonce from server: {:?}", server_hello.nonce()); let content = ClientAuthContentV0 { user, client, nonce: server_hello.nonce().clone(), }; let sig = sign(user_pk, user, &serde_bare::to_vec(&content)?) .map_err(|_e| ProtocolError::SignatureError)?; let auth_ser = serde_bare::to_vec(&ClientAuth::V0(ClientAuthV0 { content, sig }))?; //debug_println!("AUTH SENT {:?}", auth_ser); writer .send(auth_ser) .await .map_err(|_e| ProtocolError::WriteError)?; let answer = reader.next().await; if answer.is_none() { //return Err(ProtocolError::InvalidState); return Err(Self::close(writer, ProtocolError::InvalidState).await); } let auth_result = serde_bare::from_slice::(&answer.unwrap())?; match auth_result.result() { 0 => { async fn transform(message: BrokerMessage) -> Result, ProtocolError> { if message.is_close() { Ok(vec![]) } else { Ok(serde_bare::to_vec(&message)?) } } let messages_stream_write = writer.with(|message| transform(message)); let mut messages_stream_read = reader.map(|message| { if message.len() == 0 { BrokerMessage::Close } else { match serde_bare::from_slice::(&message) { Err(e) => BrokerMessage::Close, Ok(m) => m } } }); let cnx = BrokerConnectionRemote::open(messages_stream_write, messages_stream_read, user); Ok(cnx) } err => Err(Self::close(writer, ProtocolError::try_from(err).unwrap()).await), } } } pub struct BrokerConnectionRemote where T: Sink + Send + 'static, { writer: Arc>>>, user: PubKey, actors: Arc>>>, stream_actors: Arc>>>, shutdown: mpsc::UnboundedSender, } #[async_trait::async_trait] impl BrokerConnection for BrokerConnectionRemote where T: Sink + Send, { type OC = BrokerConnectionRemote; type BlockStream = async_channel::Receiver; async fn close(&mut self) { let _ = self.shutdown.close().await; let mut w = self.writer.lock().await; let _ = w.send(BrokerMessage::Close).await; let _ = w.close().await; } async fn process_overlay_request_stream_response( &mut self, overlay: OverlayId, request: BrokerOverlayRequestContentV0, ) -> Result>, ProtocolError> { let mut actor = BrokerMessageStreamActor::new(); let receiver = actor.receiver(); let error_receiver = actor.error_receiver(); let mut addr = actor .start() .await .map_err(|_e| ProtocolError::ActorError)?; let request_id = addr.actor_id(); //debug_println!("actor ID {}", request_id); { let mut map = self.stream_actors.write().expect("RwLock poisoned"); map.insert(request_id, addr.downgrade()); } let mut w = self.writer.lock().await; w.send(BrokerMessage::V0(BrokerMessageV0 { padding: vec![], //FIXME implement padding content: BrokerMessageContentV0::BrokerOverlayMessage(BrokerOverlayMessage::V0( BrokerOverlayMessageV0 { overlay, content: BrokerOverlayMessageContentV0::BrokerOverlayRequest( BrokerOverlayRequest::V0(BrokerOverlayRequestV0 { id: request_id, content: request, }), ), }, )), })) .await .map_err(|_e| ProtocolError::WriteError)?; //debug_println!("waiting for first reply"); let reply = error_receiver.await; match reply { Err(_e) => { Err(ProtocolError::Closing) } Ok(Some(e)) => { let mut map = self.stream_actors.write().expect("RwLock poisoned"); map.remove(&request_id); return Err(e); } Ok(None) => { let stream_actors_in_thread = Arc::clone(&self.stream_actors); task::spawn(async move { addr.wait_for_stop().await; // TODO add timeout let mut map = stream_actors_in_thread.write().expect("RwLock poisoned"); map.remove(&request_id); }); Ok(Box::pin(receiver)) } } } async fn process_overlay_request_objectid_response( &mut self, overlay: OverlayId, request: BrokerOverlayRequestContentV0, ) -> Result { before!(self, request_id, addr, receiver); self.writer.lock().await .send(BrokerMessage::V0(BrokerMessageV0 { padding: vec![], // FIXME implement padding content: BrokerMessageContentV0::BrokerOverlayMessage(BrokerOverlayMessage::V0( BrokerOverlayMessageV0 { overlay, content: BrokerOverlayMessageContentV0::BrokerOverlayRequest( BrokerOverlayRequest::V0(BrokerOverlayRequestV0 { id: request_id, content: request, }), ), }, )), })) .await .map_err(|_e| ProtocolError::WriteError)?; after!(self, request_id, addr, receiver, reply); reply.into() } async fn process_overlay_request( &mut self, overlay: OverlayId, request: BrokerOverlayRequestContentV0, ) -> Result<(), ProtocolError> { before!(self, request_id, addr, receiver); self.writer.lock().await .send(BrokerMessage::V0(BrokerMessageV0 { padding: vec![], // FIXME implement padding content: BrokerMessageContentV0::BrokerOverlayMessage(BrokerOverlayMessage::V0( BrokerOverlayMessageV0 { overlay, content: BrokerOverlayMessageContentV0::BrokerOverlayRequest( BrokerOverlayRequest::V0(BrokerOverlayRequestV0 { id: request_id, content: request, }), ), }, )), })) .await .map_err(|_e| ProtocolError::WriteError)?; after!(self, request_id, addr, receiver, reply); reply.into() } async fn add_user( &mut self, user_id: PubKey, admin_user_pk: PrivKey, ) -> Result<(), ProtocolError> { before!(self, request_id, addr, receiver); let op_content = AddUserContentV0 { user: user_id }; let sig = sign( admin_user_pk, self.user, &serde_bare::to_vec(&op_content)?, )?; self.writer.lock().await .send(BrokerMessage::V0(BrokerMessageV0 { padding: vec![], // TODO implement padding content: BrokerMessageContentV0::BrokerRequest(BrokerRequest::V0( BrokerRequestV0 { id: request_id, content: BrokerRequestContentV0::AddUser(AddUser::V0(AddUserV0 { content: op_content, sig, })), }, )), })) .await .map_err(|_e| ProtocolError::WriteError)?; after!(self, request_id, addr, receiver, reply); reply.into() } async fn del_user(&mut self, user_id: PubKey, admin_user_pk: PrivKey) {} async fn add_client(&mut self, client_id: ClientId, user_pk: PrivKey) {} async fn del_client(&mut self, client_id: ClientId, user_pk: PrivKey) {} async fn overlay_connect( &mut self, repo_link: &RepoLink, public: bool, ) -> Result>, ProtocolError> { let overlay = self.process_overlay_connect(repo_link, public).await?; Ok(OverlayConnectionClient::create(self, overlay,repo_link.clone() )) } } #[derive(Debug)] enum Void {} impl BrokerConnectionRemote where T: Sink + Send, { async fn connection_reader_loop< U: Stream + StreamExt + Send + Sync + Unpin + 'static, >( stream: U, actors: Arc>>>, stream_actors: Arc>>>, shutdown: mpsc::UnboundedReceiver, ) -> Result<(), ProtocolError> { let mut s = stream.fuse(); let mut shutdown = shutdown.fuse(); loop { select! { void = shutdown.next().fuse() => match void { Some(void) => match void {}, None => break, }, message = s.next().fuse() => match message { Some(message) => { //debug_println!("GOT MESSAGE {:?}", message); if message.is_close() { // releasing the blocking calls on the actors let map = actors.read().expect("RwLock poisoned"); for (a) in map.values() { if let Some(mut addr) = a.upgrade() { let _ = addr.stop(Some(ProtocolError::Closing.into())); } } let map2 = stream_actors.read().expect("RwLock poisoned"); for (a) in map2.values() { if let Some(mut addr) = a.upgrade() { let _ = addr.stop(Some(ProtocolError::Closing.into())); } } return Err(ProtocolError::Closing); } if message.is_request() { debug_println!("is request {}", message.id()); // closing connection. a client is not supposed to receive requests. return Err(ProtocolError::Closing); } else if message.is_response() { let id = message.id(); //debug_println!("is response for {}", id); { let map = actors.read().expect("RwLock poisoned"); match map.get(&id) { Some(weak_addr) => match weak_addr.upgrade() { Some(addr) => { addr.send(BrokerMessageXActor(message)) .map_err(|e| ProtocolError::Closing)? //.expect("sending message back to actor failed"); } None => { debug_println!("ERROR. Addr is dead for ID {}", id); return Err(ProtocolError::Closing); } }, None => { let map2 = stream_actors.read().expect("RwLock poisoned"); match map2.get(&id) { Some(weak_addr) => match weak_addr.upgrade() { Some(addr) => { addr.send(BrokerMessageXActor(message)) .map_err(|e| ProtocolError::Closing)? //.expect("sending message back to stream actor failed"); } None => { debug_println!( "ERROR. Addr is dead for ID {} {:?}", id, message ); return Err(ProtocolError::Closing); } }, None => { debug_println!("Actor ID not found {} {:?}", id, message); return Err(ProtocolError::Closing); } } } } } } }, None => break, } } } Ok(()) } pub fn open + StreamExt + Send + Sync + Unpin + 'static>( writer: T, reader: U, user: PubKey, ) -> BrokerConnectionRemote { let actors: Arc>>> = Arc::new(RwLock::new(HashMap::new())); let stream_actors: Arc>>> = Arc::new(RwLock::new(HashMap::new())); let (shutdown_sender, shutdown_receiver) = mpsc::unbounded::(); let w = Arc::new(Mutex::new(Box::pin(writer))); let ws_in_task = Arc::clone(&w); let actors_in_thread = Arc::clone(&actors); let stream_actors_in_thread = Arc::clone(&stream_actors); task::spawn(async move { debug_println!("START of reader loop"); if let Err(e) = Self::connection_reader_loop(reader, actors_in_thread, stream_actors_in_thread, shutdown_receiver) .await { debug_println!("closing because of {}", e); let _ = ws_in_task.lock().await.close().await; } debug_println!("END of reader loop"); }); BrokerConnectionRemote:: { writer: Arc::clone(&w), user, actors: Arc::clone(&actors), stream_actors: Arc::clone(&stream_actors), shutdown:shutdown_sender , } } }