close: refine close semantics

Signed-off-by: Alexey Galakhov <agalakhov@snapview.de>
pull/60/head
Alexey Galakhov 6 years ago
parent de90c3877d
commit b8f7d3668e
  1. 29
      src/error.rs
  2. 69
      src/protocol/mod.rs

@ -11,7 +11,6 @@ use std::string;
use httparse;
use protocol::frame::CloseFrame;
use protocol::Message;
#[cfg(feature="tls")]
@ -26,8 +25,20 @@ pub type Result<T> = result::Result<T, Error>;
/// Possible WebSocket errors
#[derive(Debug)]
pub enum Error {
/// WebSocket connection closed (normally)
ConnectionClosed(Option<CloseFrame<'static>>),
/// WebSocket connection closed normally
///
/// Upon receiving this, the server must drop the WebSocket object as soon as possible
/// to close the connection.
/// The client gets this error if the connection is already closed at the server side.
///
/// Receiving this error means that the WebSocket object is not usable anymore and the only
/// meaningful action with it is dropping it.
ConnectionClosed,
/// Trying to work with already closed connection
///
/// Trying to write after receiving `Message::Close` or trying to read after receiving
/// `Error::ConnectionClosed` causes this.
AlreadyClosed,
/// Input-output error
Io(io::Error),
#[cfg(feature="tls")]
@ -50,13 +61,8 @@ pub enum Error {
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::ConnectionClosed(ref frame) => {
if let Some(ref cf) = *frame {
write!(f, "Connection closed: {}", cf)
} else {
write!(f, "Connection closed (empty close frame)")
}
}
Error::ConnectionClosed => write!(f, "Connection closed normally"),
Error::AlreadyClosed => write!(f, "Trying to work with closed connection"),
Error::Io(ref err) => write!(f, "IO error: {}", err),
#[cfg(feature="tls")]
Error::Tls(ref err) => write!(f, "TLS error: {}", err),
@ -73,7 +79,8 @@ impl fmt::Display for Error {
impl ErrorTrait for Error {
fn description(&self) -> &str {
match *self {
Error::ConnectionClosed(_) => "A close handshake is performed",
Error::ConnectionClosed => "A close handshake is performed",
Error::AlreadyClosed => "Trying to read or write after getting close notification",
Error::Io(ref err) => err.description(),
#[cfg(feature="tls")]
Error::Tls(ref err) => err.description(),

@ -8,7 +8,7 @@ pub use self::message::Message;
pub use self::frame::CloseFrame;
use std::collections::VecDeque;
use std::io::{Read, Write};
use std::io::{Read, Write, ErrorKind as IoErrorKind};
use std::mem::replace;
use error::{Error, Result};
@ -206,6 +206,9 @@ impl WebSocketContext {
where
Stream: Read + Write,
{
// Do not read from already closed connections.
self.state.check_active()?;
loop {
// Since we may get ping or close, we need to reply to the messages even during read.
// Thus we call write_pending() but ignore its blocking.
@ -231,6 +234,9 @@ impl WebSocketContext {
where
Stream: Read + Write,
{
// Do not write to already closed connections.
self.state.check_active()?;
if let Some(max_send_queue) = self.config.max_send_queue {
if self.send_queue.len() >= max_send_queue {
// Try to make some room for the new message.
@ -288,17 +294,14 @@ impl WebSocketContext {
// willing to take more data.
// If we're closing and there is nothing to send anymore, we should close the connection.
if let WebSocketState::ClosedByPeer(ref mut frame) = self.state {
if let (Role::Server, WebSocketState::ClosedByPeer) = (&self.role, &self.state) {
// The underlying TCP connection, in most normal cases, SHOULD be closed
// first by the server, so that it holds the TIME_WAIT state and not the
// client (as this would prevent it from re-opening the connection for 2
// maximum segment lifetimes (2MSL), while there is no corresponding
// server impact as a TIME_WAIT connection is immediately reopened upon
// a new SYN with a higher seq number). (RFC 6455)
match self.role {
Role::Client => Ok(()),
Role::Server => Err(Error::ConnectionClosed(frame.take())),
}
Err(Error::ConnectionClosed)
} else {
Ok(())
}
@ -449,9 +452,10 @@ impl WebSocketContext {
} // match opcode
} else {
// Connection closed by peer
match replace(&mut self.state, WebSocketState::Terminated) {
WebSocketState::CloseAcknowledged(close) | WebSocketState::ClosedByPeer(close) => {
Ok(Some(Message::Close(close)))
WebSocketState::ClosedByPeer | WebSocketState::CloseAcknowledged => {
Err(Error::ConnectionClosed)
}
_ => {
Err(Error::Protocol("Connection reset without closing handshake".into()))
@ -461,13 +465,12 @@ impl WebSocketContext {
}
/// Received a close frame. Tells if we need to return a close frame to the user.
fn do_close(&mut self, close: Option<CloseFrame>) -> Option<Option<CloseFrame<'static>>> {
fn do_close<'t>(&mut self, close: Option<CloseFrame<'t>>) -> Option<Option<CloseFrame<'t>>> {
debug!("Received close frame: {:?}", close);
match self.state {
WebSocketState::Active => {
let close_code = close.as_ref().map(|f| f.code);
let close = close.map(CloseFrame::into_owned);
self.state = WebSocketState::ClosedByPeer(close.clone());
self.state = WebSocketState::ClosedByPeer;
let reply = if let Some(code) = close_code {
if code.is_allowed() {
Frame::close(Some(CloseFrame {
@ -488,25 +491,15 @@ impl WebSocketContext {
Some(close)
}
WebSocketState::ClosedByPeer(_) | WebSocketState::CloseAcknowledged(_) => {
WebSocketState::ClosedByPeer | WebSocketState::CloseAcknowledged => {
// It is already closed, just ignore.
None
}
WebSocketState::ClosedByUs => {
// We received a reply.
let close = close.map(CloseFrame::into_owned);
match self.role {
Role::Client => {
// Client waits for the server to close the connection.
self.state = WebSocketState::CloseAcknowledged(close);
None
}
Role::Server => {
// Server closes the connection.
self.state = WebSocketState::CloseAcknowledged;
Some(close)
}
}
}
WebSocketState::Terminated => unreachable!(),
}
}
@ -525,7 +518,20 @@ impl WebSocketContext {
frame.set_random_mask();
}
}
self.frame.write_frame(stream, frame)
let res = self.frame.write_frame(stream, frame);
// An expected "Connection reset by peer" is not fatal
match res {
Err(Error::Io(err)) => Err({
match self.state {
WebSocketState::ClosedByPeer | WebSocketState::CloseAcknowledged
if err.kind() == IoErrorKind::ConnectionReset =>
Error::ConnectionClosed,
_ => Error::Io(err),
}
}),
x => x,
}
}
}
@ -538,9 +544,9 @@ enum WebSocketState {
/// We initiated a close handshake.
ClosedByUs,
/// The peer initiated a close handshake.
ClosedByPeer(Option<CloseFrame<'static>>),
ClosedByPeer,
/// The peer replied to our close handshake.
CloseAcknowledged(Option<CloseFrame<'static>>),
CloseAcknowledged,
/// The connection does not exist anymore.
Terminated,
}
@ -553,6 +559,17 @@ impl WebSocketState {
_ => false,
}
}
/// Check if the state is active, return error if not.
fn check_active(&self) -> Result<()> {
match self {
WebSocketState::ClosedByPeer | WebSocketState::CloseAcknowledged
=> Err(Error::ConnectionClosed),
WebSocketState::Terminated
=> Err(Error::AlreadyClosed),
_ => Ok(()),
}
}
}
#[cfg(test)]

Loading…
Cancel
Save