Return close frame with ConnectionClosed

This is a breaking change, thus bump the API version.

Closes: #10

Signed-off-by: Alexey Galakhov <agalakhov@snapview.de>
pull/12/head
Alexey Galakhov 8 years ago
parent 8df6bdbeb0
commit 66e30b2767
  1. 2
      Cargo.toml
  2. 4
      examples/autobahn-client.rs
  3. 2
      examples/client.rs
  4. 14
      src/error.rs
  5. 68
      src/protocol/mod.rs

@ -9,7 +9,7 @@ readme = "README.md"
homepage = "https://github.com/snapview/tungstenite-rs" homepage = "https://github.com/snapview/tungstenite-rs"
documentation = "https://docs.rs/tungstenite" documentation = "https://docs.rs/tungstenite"
repository = "https://github.com/snapview/tungstenite-rs" repository = "https://github.com/snapview/tungstenite-rs"
version = "0.1.2" version = "0.2.0"
[features] [features]
default = ["tls"] default = ["tls"]

@ -14,7 +14,7 @@ fn get_case_count() -> Result<u32> {
Url::parse("ws://localhost:9001/getCaseCount").unwrap() Url::parse("ws://localhost:9001/getCaseCount").unwrap()
)?; )?;
let msg = socket.read_message()?; let msg = socket.read_message()?;
socket.close()?; socket.close(None)?;
Ok(msg.into_text()?.parse::<u32>().unwrap()) Ok(msg.into_text()?.parse::<u32>().unwrap())
} }
@ -22,7 +22,7 @@ fn update_reports() -> Result<()> {
let mut socket = connect( let mut socket = connect(
Url::parse(&format!("ws://localhost:9001/updateReports?agent={}", AGENT)).unwrap() Url::parse(&format!("ws://localhost:9001/updateReports?agent={}", AGENT)).unwrap()
)?; )?;
socket.close()?; socket.close(None)?;
Ok(()) Ok(())
} }

@ -16,6 +16,6 @@ fn main() {
let msg = socket.read_message().expect("Error reading message"); let msg = socket.read_message().expect("Error reading message");
println!("Received: {}", msg); println!("Received: {}", msg);
} }
// socket.close(); // socket.close(None);
} }

@ -11,6 +11,8 @@ use std::string;
use httparse; use httparse;
use protocol::frame::CloseFrame;
#[cfg(feature="tls")] #[cfg(feature="tls")]
pub mod tls { pub mod tls {
pub use native_tls::Error; pub use native_tls::Error;
@ -22,7 +24,7 @@ pub type Result<T> = result::Result<T, Error>;
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
/// WebSocket connection closed (normally) /// WebSocket connection closed (normally)
ConnectionClosed, ConnectionClosed(Option<CloseFrame<'static>>),
/// Input-output error /// Input-output error
Io(io::Error), Io(io::Error),
#[cfg(feature="tls")] #[cfg(feature="tls")]
@ -43,7 +45,13 @@ pub enum Error {
impl fmt::Display for Error { impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self { match *self {
Error::ConnectionClosed => write!(f, "Connection closed"), Error::ConnectionClosed(ref frame) => {
if let Some(ref cf) = *frame {
write!(f, "Connection closed: {}", cf)
} else {
write!(f, "Connection closed (empty close frame)")
}
}
Error::Io(ref err) => write!(f, "IO error: {}", err), Error::Io(ref err) => write!(f, "IO error: {}", err),
#[cfg(feature="tls")] #[cfg(feature="tls")]
Error::Tls(ref err) => write!(f, "TLS error: {}", err), Error::Tls(ref err) => write!(f, "TLS error: {}", err),
@ -59,7 +67,7 @@ impl fmt::Display for Error {
impl ErrorTrait for Error { impl ErrorTrait for Error {
fn description(&self) -> &str { fn description(&self) -> &str {
match *self { match *self {
Error::ConnectionClosed => "", Error::ConnectionClosed(_) => "A close handshake is performed",
Error::Io(ref err) => err.description(), Error::Io(ref err) => err.description(),
#[cfg(feature="tls")] #[cfg(feature="tls")]
Error::Tls(ref err) => err.description(), Error::Tls(ref err) => err.description(),

@ -8,7 +8,7 @@ pub use self::message::Message;
pub use self::frame::CloseFrame; pub use self::frame::CloseFrame;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::io::{Read, Write}; use std::io::{Read, Write, ErrorKind as IoErrorKind};
use std::mem::replace; use std::mem::replace;
use error::{Error, Result}; use error::{Error, Result};
@ -90,7 +90,8 @@ impl<Stream: Read + Write> WebSocket<Stream> {
self.write_pending().no_block()?; self.write_pending().no_block()?;
// If we get here, either write blocks or we have nothing to write. // If we get here, either write blocks or we have nothing to write.
// Thus if read blocks, just let it return WouldBlock. // Thus if read blocks, just let it return WouldBlock.
if let Some(message) = self.read_message_frame()? { let res = self.read_message_frame();
if let Some(message) = self.translate_close(res)? {
trace!("Received message {}", message); trace!("Received message {}", message);
return Ok(message) return Ok(message)
} }
@ -124,11 +125,11 @@ impl<Stream: Read + Write> WebSocket<Stream> {
/// ///
/// This function guarantees that the close frame will be queued. /// This function guarantees that the close frame will be queued.
/// There is no need to call it again, just like write_message(). /// There is no need to call it again, just like write_message().
pub fn close(&mut self) -> Result<()> { pub fn close(&mut self, code: Option<CloseFrame>) -> Result<()> {
match self.state { match self.state {
WebSocketState::Active => { WebSocketState::Active => {
self.state = WebSocketState::ClosedByUs; self.state = WebSocketState::ClosedByUs;
let frame = Frame::close(None); let frame = Frame::close(code);
self.send_queue.push_back(frame); self.send_queue.push_back(frame);
} }
_ => { _ => {
@ -141,7 +142,10 @@ impl<Stream: Read + Write> WebSocket<Stream> {
/// Flush the pending send queue. /// Flush the pending send queue.
pub fn write_pending(&mut self) -> Result<()> { pub fn write_pending(&mut self) -> Result<()> {
// First, make sure we have no pending frame sending. // First, make sure we have no pending frame sending.
self.socket.write_pending()?; {
let res = self.socket.write_pending();
self.translate_close(res)?;
}
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
// response, unless it already received a Close frame. It SHOULD // response, unless it already received a Close frame. It SHOULD
@ -155,8 +159,8 @@ impl<Stream: Read + Write> WebSocket<Stream> {
} }
// If we're closing and there is nothing to send anymore, we should close the connection. // If we're closing and there is nothing to send anymore, we should close the connection.
match self.state { if self.send_queue.is_empty() {
WebSocketState::ClosedByPeer if self.send_queue.is_empty() => { if let WebSocketState::ClosedByPeer(ref mut frame) = self.state {
// The underlying TCP connection, in most normal cases, SHOULD be closed // The underlying TCP connection, in most normal cases, SHOULD be closed
// first by the server, so that it holds the TIME_WAIT state and not the // first by the server, so that it holds the TIME_WAIT state and not the
// client (as this would prevent it from re-opening the connection for 2 // client (as this would prevent it from re-opening the connection for 2
@ -165,10 +169,13 @@ impl<Stream: Read + Write> WebSocket<Stream> {
// a new SYN with a higher seq number). (RFC 6455) // a new SYN with a higher seq number). (RFC 6455)
match self.role { match self.role {
Role::Client => Ok(()), Role::Client => Ok(()),
Role::Server => Err(Error::ConnectionClosed), Role::Server => Err(Error::ConnectionClosed(replace(frame, None))),
} }
} else {
Ok(())
} }
_ => Ok(()), } else {
Ok(())
} }
} }
@ -295,10 +302,12 @@ impl<Stream: Read + Write> WebSocket<Stream> {
/// Received a close frame. /// Received a close frame.
fn do_close(&mut self, close: Option<CloseFrame>) -> Result<()> { fn do_close(&mut self, close: Option<CloseFrame>) -> Result<()> {
debug!("Received close frame: {:?}", close);
match self.state { match self.state {
WebSocketState::Active => { WebSocketState::Active => {
self.state = WebSocketState::ClosedByPeer; let close_code = close.as_ref().map(|f| f.code);
let reply = if let Some(CloseFrame { code, .. }) = close { self.state = WebSocketState::ClosedByPeer(close.map(CloseFrame::into_owned));
let reply = if let Some(code) = close_code {
if code.is_allowed() { if code.is_allowed() {
Frame::close(Some(CloseFrame { Frame::close(Some(CloseFrame {
code: CloseCode::Normal, code: CloseCode::Normal,
@ -313,23 +322,26 @@ impl<Stream: Read + Write> WebSocket<Stream> {
} else { } else {
Frame::close(None) Frame::close(None)
}; };
debug!("Replying to close with {:?}", reply);
self.send_queue.push_back(reply); self.send_queue.push_back(reply);
Ok(()) Ok(())
} }
WebSocketState::ClosedByPeer => { WebSocketState::ClosedByPeer(_) | WebSocketState::CloseAcknowledged(_) => {
// It is already closed, just ignore. // It is already closed, just ignore.
Ok(()) Ok(())
} }
WebSocketState::ClosedByUs => { WebSocketState::ClosedByUs => {
// We received a reply. // We received a reply.
let close = close.map(CloseFrame::into_owned);
match self.role { match self.role {
Role::Client => { Role::Client => {
// Client waits for the server to close the connection. // Client waits for the server to close the connection.
self.state = WebSocketState::CloseAcknowledged(close);
Ok(()) Ok(())
} }
Role::Server => { Role::Server => {
// Server closes the connection. // Server closes the connection.
Err(Error::ConnectionClosed) Err(Error::ConnectionClosed(close))
} }
} }
} }
@ -368,16 +380,42 @@ impl<Stream: Read + Write> WebSocket<Stream> {
frame.set_mask(); frame.set_mask();
} }
} }
self.socket.write_frame(frame) let res = self.socket.write_frame(frame);
self.translate_close(res)
}
/// Translate a "Connection reset by peer" into ConnectionClosed as needed.
fn translate_close<T>(&mut self, res: Result<T>) -> Result<T> {
match res {
Err(Error::Io(err)) => Err({
if err.kind() == IoErrorKind::ConnectionReset {
match self.state {
WebSocketState::ClosedByPeer(ref mut frame) =>
Error::ConnectionClosed(replace(frame, None)),
WebSocketState::CloseAcknowledged(ref mut frame) =>
Error::ConnectionClosed(replace(frame, None)),
_ => Error::Io(err),
}
} else {
Error::Io(err)
}
}),
x => x,
}
} }
} }
/// The current connection state. /// The current connection state.
enum WebSocketState { enum WebSocketState {
/// The connection is active.
Active, Active,
/// We initiated a close handshake.
ClosedByUs, ClosedByUs,
ClosedByPeer, /// The peer initiated a close handshake.
ClosedByPeer(Option<CloseFrame<'static>>),
/// The peer replied to our close handshake.
CloseAcknowledged(Option<CloseFrame<'static>>),
} }
impl WebSocketState { impl WebSocketState {

Loading…
Cancel
Save