sending Ids of ProtocolMessage

Niko 2 years ago
parent 5d8509e1dc
commit fd392f2ac9
  1. 3
      ng-app-js/src/lib.rs
  2. 2
      p2p-client-ws/src/remote_ws.rs
  3. 14
      p2p-net/src/actor.rs
  4. 5
      p2p-net/src/broker.rs
  5. 49
      p2p-net/src/connection.rs
  6. 75
      p2p-net/src/types.rs

@ -41,6 +41,9 @@ pub async fn greet(name: &str) {
) )
.await; .await;
log!("broker.connect : {:?}", res); log!("broker.connect : {:?}", res);
if res.is_err() {
panic!("Cannot connect");
}
BROKER.read().await.print_status(); BROKER.read().await.print_status();
//res.expect_throw("assume the connection succeeds"); //res.expect_throw("assume the connection succeeds");

@ -340,7 +340,7 @@ mod test {
) )
.await; .await;
log!("broker.connect : {:?}", res); log!("broker.connect : {:?}", res);
//res.expect_throw("assume the connection succeeds"); res.expect("assume the connection succeeds");
} }
async fn timer_close(remote_peer_id: DirectPeerId) -> ResultSend<()> { async fn timer_close(remote_peer_id: DirectPeerId) -> ResultSend<()> {

@ -42,7 +42,7 @@ pub trait EActor: Send + Sync + std::fmt::Debug {
pub struct Actor< pub struct Actor<
'a, 'a,
A: BrokerRequest, A: BrokerRequest,
B: TryFrom<ProtocolMessage, Error = ProtocolError> + std::fmt::Debug + std::marker::Sync, B: TryFrom<ProtocolMessage, Error = ProtocolError> + std::fmt::Debug + Sync,
> { > {
id: i64, id: i64,
phantom_a: PhantomData<&'a A>, phantom_a: PhantomData<&'a A>,
@ -103,19 +103,19 @@ impl<
} }
} }
pub fn verify(&self, msg: ProtocolMessage) -> bool { // pub fn verify(&self, msg: ProtocolMessage) -> bool {
self.initiator && msg.type_id() == TypeId::of::<B>() // self.initiator && msg.type_id() == TypeId::of::<B>()
|| !self.initiator && msg.type_id() == TypeId::of::<A>() // || !self.initiator && msg.type_id() == TypeId::of::<A>()
} // }
pub async fn request( pub async fn request(
&mut self, &mut self,
msg: impl BrokerRequest + std::marker::Sync + std::marker::Send, msg: ProtocolMessage,
stream: Option<impl BrokerRequest + std::marker::Send>, stream: Option<impl BrokerRequest + std::marker::Send>,
fsm: Arc<Mutex<NoiseFSM>>, fsm: Arc<Mutex<NoiseFSM>>,
) -> Result<B, ProtocolError> { ) -> Result<B, ProtocolError> {
//sender.send(ConnectionCommand::Msg(msg.send())).await; //sender.send(ConnectionCommand::Msg(msg.send())).await;
fsm.lock().await.send(msg.send()).await?; fsm.lock().await.send(msg).await?;
match self.receiver.next().await { match self.receiver.next().await {
Some(ConnectionCommand::Msg(msg)) => msg.try_into(), Some(ConnectionCommand::Msg(msg)) => msg.try_into(),
_ => Err(ProtocolError::ActorError), _ => Err(ProtocolError::ActorError),

@ -177,9 +177,8 @@ impl Broker {
//let cnx = Arc::new(); //let cnx = Arc::new();
let (priv_key, pub_key) = generate_keypair(); let (priv_key, pub_key) = generate_keypair();
log!("CONNECTING"); log!("CONNECTING");
let connection_res = cnx.open(ip, priv_key, pub_key, remote_peer_id).await; let mut connection = cnx.open(ip, priv_key, pub_key, remote_peer_id).await?;
log!("CONNECTED {:?}", connection_res);
let mut connection = connection_res.unwrap();
let join = connection.take_shutdown(); let join = connection.take_shutdown();
let connected = if core.is_some() { let connected = if core.is_some() {

@ -135,21 +135,21 @@ impl NoiseFSM {
} }
} }
pub async fn receive( // pub async fn receive(
&mut self, // &mut self,
msg: ProtocolMessage, // msg: ProtocolMessage,
) -> Result<ProtocolMessage, ProtocolError> { // ) -> Result<ProtocolMessage, ProtocolError> {
if self.state == FSMstate::AuthResult && self.noise_cipher_state.is_some() { // if self.state == FSMstate::AuthResult && self.noise_cipher_state.is_some() {
if let ProtocolMessage::Noise(noise) = msg { // if let ProtocolMessage::Noise(noise) = msg {
let new = self.decrypt(&noise); // let new = self.decrypt(&noise);
Ok(new) // Ok(new)
} else { // } else {
Err(ProtocolError::MustBeEncrypted) // Err(ProtocolError::MustBeEncrypted)
} // }
} else { // } else {
Err(ProtocolError::InvalidState) // Err(ProtocolError::InvalidState)
} // }
} // }
pub fn step( pub fn step(
&mut self, &mut self,
@ -181,7 +181,7 @@ impl NoiseFSM {
FSMstate::Noise0 => {} FSMstate::Noise0 => {}
FSMstate::Noise1 => {} FSMstate::Noise1 => {}
FSMstate::Noise2 => {} FSMstate::Noise2 => {}
FSMstate::Noise3 => {} FSMstate::Noise3 => {} //set noise_handshake_state to none
FSMstate::ExtRequest => {} FSMstate::ExtRequest => {}
FSMstate::ExtResponse => {} FSMstate::ExtResponse => {}
FSMstate::ClientHello => {} FSMstate::ClientHello => {}
@ -348,11 +348,8 @@ impl ConnectionBase {
} }
pub async fn request< pub async fn request<
A: crate::actor::BrokerRequest + std::marker::Sync + std::marker::Send + 'static, A: crate::actor::BrokerRequest + Sync + Send + 'static,
B: TryFrom<ProtocolMessage, Error = ProtocolError> B: TryFrom<ProtocolMessage, Error = ProtocolError> + std::fmt::Debug + Sync + 'static,
+ std::fmt::Debug
+ std::marker::Sync
+ 'static,
>( >(
&self, &self,
msg: A, msg: A,
@ -368,9 +365,13 @@ impl ConnectionBase {
} }
let mut actor = Box::new(Actor::<A, B>::new(id, true)); let mut actor = Box::new(Actor::<A, B>::new(id, true));
self.actors.lock().await.insert(id, actor.get_receiver_tx()); self.actors.lock().await.insert(id, actor.get_receiver_tx());
actor let mut proto_msg = msg.send();
.request(msg, stream, Arc::clone(self.fsm.as_ref().unwrap())) proto_msg.set_id(id);
.await let res = actor
.request(proto_msg, stream, Arc::clone(self.fsm.as_ref().unwrap()))
.await;
self.actors.lock().await.remove(&id);
res
} }
pub async fn send(&mut self, cmd: ConnectionCommand) { pub async fn send(&mut self, cmd: ConnectionCommand) {

@ -842,6 +842,13 @@ impl BrokerRequest {
BrokerRequest::V0(o) => o.id, BrokerRequest::V0(o) => o.id,
} }
} }
pub fn set_id(&mut self, id: i64) {
match self {
BrokerRequest::V0(v0) => {
v0.id = id;
}
}
}
pub fn type_id(&self) -> TypeId { pub fn type_id(&self) -> TypeId {
match self { match self {
BrokerRequest::V0(o) => o.content.type_id(), BrokerRequest::V0(o) => o.content.type_id(),
@ -876,6 +883,13 @@ impl BrokerResponse {
BrokerResponse::V0(o) => o.id, BrokerResponse::V0(o) => o.id,
} }
} }
pub fn set_id(&mut self, id: i64) {
match self {
BrokerResponse::V0(v0) => {
v0.id = id;
}
}
}
pub fn result(&self) -> u16 { pub fn result(&self) -> u16 {
match self { match self {
BrokerResponse::V0(o) => o.result, BrokerResponse::V0(o) => o.result,
@ -1194,6 +1208,13 @@ impl BrokerOverlayRequest {
BrokerOverlayRequest::V0(o) => o.id, BrokerOverlayRequest::V0(o) => o.id,
} }
} }
pub fn set_id(&mut self, id: i64) {
match self {
BrokerOverlayRequest::V0(v0) => {
v0.id = id;
}
}
}
pub fn content_v0(&self) -> &BrokerOverlayRequestContentV0 { pub fn content_v0(&self) -> &BrokerOverlayRequestContentV0 {
match self { match self {
BrokerOverlayRequest::V0(o) => &o.content, BrokerOverlayRequest::V0(o) => &o.content,
@ -1234,6 +1255,13 @@ impl BrokerOverlayResponse {
BrokerOverlayResponse::V0(o) => o.id, BrokerOverlayResponse::V0(o) => o.id,
} }
} }
pub fn set_id(&mut self, id: i64) {
match self {
BrokerOverlayResponse::V0(v0) => {
v0.id = id;
}
}
}
pub fn result(&self) -> u16 { pub fn result(&self) -> u16 {
match self { match self {
BrokerOverlayResponse::V0(o) => o.result, BrokerOverlayResponse::V0(o) => o.result,
@ -1329,6 +1357,17 @@ impl BrokerOverlayMessage {
}, },
} }
} }
pub fn set_id(&mut self, id: i64) {
match self {
BrokerOverlayMessage::V0(o) => match &mut o.content {
BrokerOverlayMessageContentV0::BrokerOverlayResponse(ref mut r) => r.set_id(id),
BrokerOverlayMessageContentV0::BrokerOverlayRequest(ref mut r) => r.set_id(id),
BrokerOverlayMessageContentV0::Event(_) => {
panic!("it is an event")
}
},
}
}
pub fn result(&self) -> u16 { pub fn result(&self) -> u16 {
match self { match self {
BrokerOverlayMessage::V0(o) => match &o.content { BrokerOverlayMessage::V0(o) => match &o.content {
@ -1450,6 +1489,16 @@ impl BrokerMessage {
BrokerMessage::Close => panic!("Close not implemented"), BrokerMessage::Close => panic!("Close not implemented"),
} }
} }
pub fn set_id(&mut self, id: i64) {
match self {
BrokerMessage::V0(o) => match &mut o.content {
BrokerMessageContentV0::BrokerOverlayMessage(ref mut p) => p.set_id(id),
BrokerMessageContentV0::BrokerResponse(ref mut r) => r.set_id(id),
BrokerMessageContentV0::BrokerRequest(ref mut r) => r.set_id(id),
},
BrokerMessage::Close => panic!("Close not implemented"),
}
}
pub fn result(&self) -> u16 { pub fn result(&self) -> u16 {
match self { match self {
BrokerMessage::V0(o) => match &o.content { BrokerMessage::V0(o) => match &o.content {
@ -1577,6 +1626,13 @@ impl ExtRequest {
ExtRequest::V0(v0) => v0.id, ExtRequest::V0(v0) => v0.id,
} }
} }
pub fn set_id(&mut self, id: i64) {
match self {
ExtRequest::V0(v0) => {
v0.id = id;
}
}
}
} }
/// Content of ExtResponseV0 /// Content of ExtResponseV0
@ -1612,6 +1668,13 @@ impl ExtResponse {
ExtResponse::V0(v0) => v0.id, ExtResponse::V0(v0) => v0.id,
} }
} }
pub fn set_id(&mut self, id: i64) {
match self {
ExtResponse::V0(v0) => {
v0.id = id;
}
}
}
} }
impl TryFrom<ProtocolMessage> for ExtResponse { impl TryFrom<ProtocolMessage> for ExtResponse {
@ -1653,6 +1716,18 @@ impl ProtocolMessage {
ProtocolMessage::BrokerMessage(broker_msg) => broker_msg.id(), ProtocolMessage::BrokerMessage(broker_msg) => broker_msg.id(),
} }
} }
pub fn set_id(&mut self, id: i64) {
match self {
ProtocolMessage::Noise(_) => panic!("cannot set ID"),
ProtocolMessage::Start(_) => panic!("cannot set ID"),
ProtocolMessage::ServerHello(_) => panic!("cannot set ID"),
ProtocolMessage::ClientAuth(_) => panic!("cannot set ID"),
ProtocolMessage::AuthResult(_) => panic!("cannot set ID"),
ProtocolMessage::ExtRequest(ext_req) => ext_req.set_id(id),
ProtocolMessage::ExtResponse(ext_res) => ext_res.set_id(id),
ProtocolMessage::BrokerMessage(broker_msg) => broker_msg.set_id(id),
}
}
pub fn type_id(&self) -> TypeId { pub fn type_id(&self) -> TypeId {
match self { match self {
ProtocolMessage::Noise(a) => a.type_id(), ProtocolMessage::Noise(a) => a.type_id(),

Loading…
Cancel
Save