Make `Close` part of the `Message` enum

pull/56/head
Daniel Abramov 6 years ago
parent c33f4f3895
commit c8c3d6290d
  1. 3
      examples/autobahn-client.rs
  2. 3
      examples/autobahn-server.rs
  3. 2
      src/protocol/frame/frame.rs
  4. 18
      src/protocol/message.rs
  5. 88
      src/protocol/mod.rs

@ -39,7 +39,8 @@ fn run_test(case: u32) -> Result<()> {
socket.write_message(msg)?; socket.write_message(msg)?;
} }
Message::Ping(_) | Message::Ping(_) |
Message::Pong(_) => {} Message::Pong(_) |
Message::Close(_) => {}
} }
} }
} }

@ -24,7 +24,8 @@ fn handle_client(stream: TcpStream) -> Result<()> {
socket.write_message(msg)?; socket.write_message(msg)?;
} }
Message::Ping(_) | Message::Ping(_) |
Message::Pong(_) => {} Message::Pong(_) |
Message::Close(_) => {}
} }
} }
} }

@ -11,7 +11,7 @@ use super::coding::{OpCode, Control, Data, CloseCode};
use super::mask::{generate_mask, apply_mask}; use super::mask::{generate_mask, apply_mask};
/// A struct representing the close command. /// A struct representing the close command.
#[derive(Debug, Clone)] #[derive(Debug, Clone, Eq, PartialEq)]
pub struct CloseFrame<'t> { pub struct CloseFrame<'t> {
/// The reason as a code. /// The reason as a code.
pub code: CloseCode, pub code: CloseCode,

@ -4,6 +4,7 @@ use std::result::Result as StdResult;
use std::str; use std::str;
use error::{Result, Error}; use error::{Result, Error};
use super::frame::CloseFrame;
mod string_collect { mod string_collect {
@ -179,6 +180,8 @@ pub enum Message {
/// ///
/// The payload here must have a length less than 125 bytes /// The payload here must have a length less than 125 bytes
Pong(Vec<u8>), Pong(Vec<u8>),
/// A close message with the optional close frame.
Close(Option<CloseFrame<'static>>),
} }
impl Message { impl Message {
@ -229,6 +232,14 @@ impl Message {
} }
} }
/// Indicates whether a message ia s close message.
pub fn is_close(&self) -> bool {
match *self {
Message::Close(_) => true,
_ => false,
}
}
/// Get the length of the WebSocket message. /// Get the length of the WebSocket message.
pub fn len(&self) -> usize { pub fn len(&self) -> usize {
match *self { match *self {
@ -236,6 +247,7 @@ impl Message {
Message::Binary(ref data) | Message::Binary(ref data) |
Message::Ping(ref data) | Message::Ping(ref data) |
Message::Pong(ref data) => data.len(), Message::Pong(ref data) => data.len(),
Message::Close(ref data) => data.as_ref().map(|d| d.reason.len()).unwrap_or(0),
} }
} }
@ -252,6 +264,8 @@ impl Message {
Message::Binary(data) | Message::Binary(data) |
Message::Ping(data) | Message::Ping(data) |
Message::Pong(data) => data, Message::Pong(data) => data,
Message::Close(None) => Vec::new(),
Message::Close(Some(frame)) => frame.reason.into_owned().into_bytes(),
} }
} }
@ -263,6 +277,8 @@ impl Message {
Message::Ping(data) | Message::Ping(data) |
Message::Pong(data) => Ok(try!( Message::Pong(data) => Ok(try!(
String::from_utf8(data).map_err(|err| err.utf8_error()))), String::from_utf8(data).map_err(|err| err.utf8_error()))),
Message::Close(None) => Ok(String::new()),
Message::Close(Some(frame)) => Ok(frame.reason.into_owned()),
} }
} }
@ -274,6 +290,8 @@ impl Message {
Message::Binary(ref data) | Message::Binary(ref data) |
Message::Ping(ref data) | Message::Ping(ref data) |
Message::Pong(ref data) => Ok(try!(str::from_utf8(data))), Message::Pong(ref data) => Ok(try!(str::from_utf8(data))),
Message::Close(None) => Ok(""),
Message::Close(Some(ref frame)) => Ok(&frame.reason),
} }
} }

@ -8,7 +8,7 @@ pub use self::message::Message;
pub use self::frame::CloseFrame; pub use self::frame::CloseFrame;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::io::{Read, Write, ErrorKind as IoErrorKind}; use std::io::{Read, Write};
use std::mem::replace; use std::mem::replace;
use error::{Error, Result}; use error::{Error, Result};
@ -146,8 +146,7 @@ impl<Stream: Read + Write> WebSocket<Stream> {
self.write_pending().no_block()?; self.write_pending().no_block()?;
// If we get here, either write blocks or we have nothing to write. // If we get here, either write blocks or we have nothing to write.
// Thus if read blocks, just let it return WouldBlock. // Thus if read blocks, just let it return WouldBlock.
let res = self.read_message_frame(); if let Some(message) = self.read_message_frame()? {
if let Some(message) = self.translate_close(res)? {
trace!("Received message {}", message); trace!("Received message {}", message);
return Ok(message) return Ok(message)
} }
@ -188,34 +187,19 @@ impl<Stream: Read + Write> WebSocket<Stream> {
self.pong = Some(Frame::pong(data)); self.pong = Some(Frame::pong(data));
return self.write_pending() return self.write_pending()
} }
}; Message::Close(code) => {
return self.close(code)
self.send_queue.push_back(frame);
self.write_pending()
} }
};
/// Close the connection.
///
/// This function guarantees that the close frame will be queued.
/// There is no need to call it again.
pub fn close(&mut self, code: Option<CloseFrame>) -> Result<()> {
if let WebSocketState::Active = self.state {
self.state = WebSocketState::ClosedByUs;
let frame = Frame::close(code);
self.send_queue.push_back(frame); self.send_queue.push_back(frame);
} else {
// Already closed, nothing to do.
}
self.write_pending() self.write_pending()
} }
/// Flush the pending send queue. /// Flush the pending send queue.
pub fn write_pending(&mut self) -> Result<()> { pub fn write_pending(&mut self) -> Result<()> {
// First, make sure we have no pending frame sending. // First, make sure we have no pending frame sending.
{ self.socket.write_pending()?;
let res = self.socket.write_pending();
self.translate_close(res)?;
}
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
// response, unless it already received a Close frame. It SHOULD // response, unless it already received a Close frame. It SHOULD
@ -247,6 +231,22 @@ impl<Stream: Read + Write> WebSocket<Stream> {
Ok(()) Ok(())
} }
} }
/// Close the connection.
///
/// This function guarantees that the close frame will be queued.
/// There is no need to call it again. Calling this function is
/// the same as calling `write(Message::Close(..))`.
pub fn close(&mut self, code: Option<CloseFrame>) -> Result<()> {
if let WebSocketState::Active = self.state {
self.state = WebSocketState::ClosedByUs;
let frame = Frame::close(code);
self.send_queue.push_back(frame);
} else {
// Already closed, nothing to do.
}
self.write_pending()
}
} }
impl<Stream: Read + Write> WebSocket<Stream> { impl<Stream: Read + Write> WebSocket<Stream> {
@ -299,7 +299,7 @@ impl<Stream: Read + Write> WebSocket<Stream> {
Err(Error::Protocol("Control frame too big".into())) Err(Error::Protocol("Control frame too big".into()))
} }
OpCtl::Close => { OpCtl::Close => {
self.do_close(frame.into_close()?).map(|_| None) Ok(self.do_close(frame.into_close()?).map(Message::Close))
} }
OpCtl::Reserved(i) => { OpCtl::Reserved(i) => {
Err(Error::Protocol(format!("Unknown control frame type {}", i).into())) Err(Error::Protocol(format!("Unknown control frame type {}", i).into()))
@ -373,7 +373,7 @@ impl<Stream: Read + Write> WebSocket<Stream> {
} else { } else {
match replace(&mut self.state, WebSocketState::Terminated) { match replace(&mut self.state, WebSocketState::Terminated) {
WebSocketState::CloseAcknowledged(close) | WebSocketState::ClosedByPeer(close) => { WebSocketState::CloseAcknowledged(close) | WebSocketState::ClosedByPeer(close) => {
Err(Error::ConnectionClosed(close)) Ok(Some(Message::Close(close)))
} }
_ => { _ => {
Err(Error::Protocol("Connection reset without closing handshake".into())) Err(Error::Protocol("Connection reset without closing handshake".into()))
@ -382,13 +382,14 @@ impl<Stream: Read + Write> WebSocket<Stream> {
} }
} }
/// Received a close frame. /// Received a close frame. Tells if we need to return a close frame to the user.
fn do_close(&mut self, close: Option<CloseFrame>) -> Result<()> { fn do_close(&mut self, close: Option<CloseFrame>) -> Option<Option<CloseFrame<'static>>> {
debug!("Received close frame: {:?}", close); debug!("Received close frame: {:?}", close);
match self.state { match self.state {
WebSocketState::Active => { WebSocketState::Active => {
let close_code = close.as_ref().map(|f| f.code); let close_code = close.as_ref().map(|f| f.code);
self.state = WebSocketState::ClosedByPeer(close.map(CloseFrame::into_owned)); let close = close.map(CloseFrame::into_owned);
self.state = WebSocketState::ClosedByPeer(close.clone());
let reply = if let Some(code) = close_code { let reply = if let Some(code) = close_code {
if code.is_allowed() { if code.is_allowed() {
Frame::close(Some(CloseFrame { Frame::close(Some(CloseFrame {
@ -406,11 +407,12 @@ impl<Stream: Read + Write> WebSocket<Stream> {
}; };
debug!("Replying to close with {:?}", reply); debug!("Replying to close with {:?}", reply);
self.send_queue.push_back(reply); self.send_queue.push_back(reply);
Ok(())
Some(close)
} }
WebSocketState::ClosedByPeer(_) | WebSocketState::CloseAcknowledged(_) => { WebSocketState::ClosedByPeer(_) | WebSocketState::CloseAcknowledged(_) => {
// It is already closed, just ignore. // It is already closed, just ignore.
Ok(()) None
} }
WebSocketState::ClosedByUs => { WebSocketState::ClosedByUs => {
// We received a reply. // We received a reply.
@ -419,11 +421,11 @@ impl<Stream: Read + Write> WebSocket<Stream> {
Role::Client => { Role::Client => {
// Client waits for the server to close the connection. // Client waits for the server to close the connection.
self.state = WebSocketState::CloseAcknowledged(close); self.state = WebSocketState::CloseAcknowledged(close);
Ok(()) None
} }
Role::Server => { Role::Server => {
// Server closes the connection. // Server closes the connection.
Err(Error::ConnectionClosed(close)) Some(close)
} }
} }
} }
@ -442,30 +444,8 @@ impl<Stream: Read + Write> WebSocket<Stream> {
frame.set_random_mask(); frame.set_random_mask();
} }
} }
let res = self.socket.write_frame(frame); self.socket.write_frame(frame)
self.translate_close(res)
}
/// Translate a "Connection reset by peer" into ConnectionClosed as needed.
fn translate_close<T>(&mut self, res: Result<T>) -> Result<T> {
match res {
Err(Error::Io(err)) => Err({
if err.kind() == IoErrorKind::ConnectionReset {
match self.state {
WebSocketState::ClosedByPeer(ref mut frame) =>
Error::ConnectionClosed(frame.take()),
WebSocketState::CloseAcknowledged(ref mut frame) =>
Error::ConnectionClosed(frame.take()),
_ => Error::Io(err),
} }
} else {
Error::Io(err)
}
}),
x => x,
}
}
} }
/// The current connection state. /// The current connection state.

Loading…
Cancel
Save