Merge pull request #60 from snapview/close-fixes

Fixes for the "close-is-a-message" semantics.
pull/63/head v0.8.0
Alexey Galakhov 6 years ago committed by GitHub
commit dd300f8bd7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      Cargo.toml
  2. 2
      examples/server.rs
  3. 4
      fuzz/Cargo.toml
  4. 2
      fuzz/fuzz_targets/parse_frame_header.rs
  5. 2
      fuzz/fuzz_targets/read_message_client.rs
  6. 2
      fuzz/fuzz_targets/read_message_server.rs
  7. 0
      fuzz/seeds/parse_frame_header/0a476920bf2b922f5d1955e5bedfedec8c13765c
  8. 0
      fuzz/seeds/parse_frame_header/0b3a842c0b93a42d724bbab2504ae92f0593232a
  9. 0
      fuzz/seeds/parse_frame_header/0e1990ce3bb9b7c62d5e11b83514467eca3be187
  10. 0
      fuzz/seeds/parse_frame_header/0fe89d4911d87df66f98147a0fd1f3624f34cdaa
  11. 0
      fuzz/seeds/parse_frame_header/370ee4d5682fb8c2770cac1ec89e0f6674990afb
  12. 0
      fuzz/seeds/parse_frame_header/47167be8eb1f00d36f0a93a8f61c77b4d4d2f515
  13. 0
      fuzz/seeds/parse_frame_header/51112bf88688ede230dd473db8fa0be2b945e636
  14. 0
      fuzz/seeds/parse_frame_header/7a0edc789a1d22737bfbb4cb29f26d994614cb80
  15. 0
      fuzz/seeds/parse_frame_header/9cf76db0663ea4025bbf89de1dfbb2ff818a26ae
  16. 0
      fuzz/seeds/parse_frame_header/bc4057a2970c60f0c2b07200842b8d0c15e2af27
  17. 0
      fuzz/seeds/parse_frame_header/bd811f21956aa72bb5a9114bdf4dc61e1710ad44
  18. 0
      fuzz/seeds/parse_frame_header/dc048e38bdb36cf80cc4fe5b3b21ea094510af4a
  19. 0
      fuzz/seeds/parse_frame_header/ec7d070174e40ace678006d0c631026f1d9a0779
  20. 0
      fuzz/seeds/parse_frame_header/ef420abfddbda7b9ee665d85ef62e4a437554003
  21. 0
      fuzz/seeds/parse_frame_header/fab3e168d56fecfc59f98a69f75673a53e94c215
  22. 29
      src/error.rs
  23. 71
      src/protocol/mod.rs

@ -7,9 +7,9 @@ authors = ["Alexey Galakhov"]
license = "MIT/Apache-2.0" license = "MIT/Apache-2.0"
readme = "README.md" readme = "README.md"
homepage = "https://github.com/snapview/tungstenite-rs" homepage = "https://github.com/snapview/tungstenite-rs"
documentation = "https://docs.rs/tungstenite/0.7.0" documentation = "https://docs.rs/tungstenite/0.8.0"
repository = "https://github.com/snapview/tungstenite-rs" repository = "https://github.com/snapview/tungstenite-rs"
version = "0.7.0" version = "0.8.0"
[features] [features]
default = ["tls"] default = ["tls"]

@ -1,4 +1,5 @@
extern crate tungstenite; extern crate tungstenite;
extern crate env_logger;
use std::thread::spawn; use std::thread::spawn;
use std::net::TcpListener; use std::net::TcpListener;
@ -7,6 +8,7 @@ use tungstenite::accept_hdr;
use tungstenite::handshake::server::Request; use tungstenite::handshake::server::Request;
fn main() { fn main() {
env_logger::init();
let server = TcpListener::bind("127.0.0.1:3012").unwrap(); let server = TcpListener::bind("127.0.0.1:3012").unwrap();
for stream in server.incoming() { for stream in server.incoming() {
spawn(move || { spawn(move || {

@ -18,8 +18,8 @@ git = "https://github.com/rust-fuzz/libfuzzer-sys.git"
members = ["."] members = ["."]
[[bin]] [[bin]]
name = "parse_frame" name = "parse_frame_header"
path = "fuzz_targets/parse_frame.rs" path = "fuzz_targets/parse_frame_header.rs"
[[bin]] [[bin]]
name = "read_message_server" name = "read_message_server"

@ -8,5 +8,5 @@ fuzz_target!(|data: &[u8]| {
let vector: Vec<u8> = data.into(); let vector: Vec<u8> = data.into();
let mut cursor = Cursor::new(vector); let mut cursor = Cursor::new(vector);
tungstenite::protocol::frame::Frame::parse(&mut cursor); tungstenite::protocol::frame::FrameHeader::parse(&mut cursor).ok();
}); });

@ -33,5 +33,5 @@ fuzz_target!(|data: &[u8]| {
//let vector: Vec<u8> = data.into(); //let vector: Vec<u8> = data.into();
let cursor = Cursor::new(data); let cursor = Cursor::new(data);
let mut socket = WebSocket::from_raw_socket(WriteMoc(cursor), Role::Client, None); let mut socket = WebSocket::from_raw_socket(WriteMoc(cursor), Role::Client, None);
socket.read_message(); socket.read_message().ok();
}); });

@ -33,5 +33,5 @@ fuzz_target!(|data: &[u8]| {
//let vector: Vec<u8> = data.into(); //let vector: Vec<u8> = data.into();
let cursor = Cursor::new(data); let cursor = Cursor::new(data);
let mut socket = WebSocket::from_raw_socket(WriteMoc(cursor), Role::Server, None); let mut socket = WebSocket::from_raw_socket(WriteMoc(cursor), Role::Server, None);
socket.read_message(); socket.read_message().ok();
}); });

@ -11,7 +11,6 @@ use std::string;
use httparse; use httparse;
use protocol::frame::CloseFrame;
use protocol::Message; use protocol::Message;
#[cfg(feature="tls")] #[cfg(feature="tls")]
@ -26,8 +25,20 @@ pub type Result<T> = result::Result<T, Error>;
/// Possible WebSocket errors /// Possible WebSocket errors
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
/// WebSocket connection closed (normally) /// WebSocket connection closed normally
ConnectionClosed(Option<CloseFrame<'static>>), ///
/// Upon receiving this, the server must drop the WebSocket object as soon as possible
/// to close the connection.
/// The client gets this error if the connection is already closed at the server side.
///
/// Receiving this error means that the WebSocket object is not usable anymore and the only
/// meaningful action with it is dropping it.
ConnectionClosed,
/// Trying to work with already closed connection
///
/// Trying to write after receiving `Message::Close` or trying to read after receiving
/// `Error::ConnectionClosed` causes this.
AlreadyClosed,
/// Input-output error /// Input-output error
Io(io::Error), Io(io::Error),
#[cfg(feature="tls")] #[cfg(feature="tls")]
@ -50,13 +61,8 @@ pub enum Error {
impl fmt::Display for Error { impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self { match *self {
Error::ConnectionClosed(ref frame) => { Error::ConnectionClosed => write!(f, "Connection closed normally"),
if let Some(ref cf) = *frame { Error::AlreadyClosed => write!(f, "Trying to work with closed connection"),
write!(f, "Connection closed: {}", cf)
} else {
write!(f, "Connection closed (empty close frame)")
}
}
Error::Io(ref err) => write!(f, "IO error: {}", err), Error::Io(ref err) => write!(f, "IO error: {}", err),
#[cfg(feature="tls")] #[cfg(feature="tls")]
Error::Tls(ref err) => write!(f, "TLS error: {}", err), Error::Tls(ref err) => write!(f, "TLS error: {}", err),
@ -73,7 +79,8 @@ impl fmt::Display for Error {
impl ErrorTrait for Error { impl ErrorTrait for Error {
fn description(&self) -> &str { fn description(&self) -> &str {
match *self { match *self {
Error::ConnectionClosed(_) => "A close handshake is performed", Error::ConnectionClosed => "A close handshake is performed",
Error::AlreadyClosed => "Trying to read or write after getting close notification",
Error::Io(ref err) => err.description(), Error::Io(ref err) => err.description(),
#[cfg(feature="tls")] #[cfg(feature="tls")]
Error::Tls(ref err) => err.description(), Error::Tls(ref err) => err.description(),

@ -8,7 +8,7 @@ pub use self::message::Message;
pub use self::frame::CloseFrame; pub use self::frame::CloseFrame;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::io::{Read, Write}; use std::io::{Read, Write, ErrorKind as IoErrorKind};
use std::mem::replace; use std::mem::replace;
use error::{Error, Result}; use error::{Error, Result};
@ -206,6 +206,9 @@ impl WebSocketContext {
where where
Stream: Read + Write, Stream: Read + Write,
{ {
// Do not read from already closed connections.
self.state.check_active()?;
loop { loop {
// Since we may get ping or close, we need to reply to the messages even during read. // Since we may get ping or close, we need to reply to the messages even during read.
// Thus we call write_pending() but ignore its blocking. // Thus we call write_pending() but ignore its blocking.
@ -231,6 +234,9 @@ impl WebSocketContext {
where where
Stream: Read + Write, Stream: Read + Write,
{ {
// Do not write to already closed connections.
self.state.check_active()?;
if let Some(max_send_queue) = self.config.max_send_queue { if let Some(max_send_queue) = self.config.max_send_queue {
if self.send_queue.len() >= max_send_queue { if self.send_queue.len() >= max_send_queue {
// Try to make some room for the new message. // Try to make some room for the new message.
@ -288,17 +294,14 @@ impl WebSocketContext {
// willing to take more data. // willing to take more data.
// If we're closing and there is nothing to send anymore, we should close the connection. // If we're closing and there is nothing to send anymore, we should close the connection.
if let WebSocketState::ClosedByPeer(ref mut frame) = self.state { if let (Role::Server, WebSocketState::ClosedByPeer) = (&self.role, &self.state) {
// The underlying TCP connection, in most normal cases, SHOULD be closed // The underlying TCP connection, in most normal cases, SHOULD be closed
// first by the server, so that it holds the TIME_WAIT state and not the // first by the server, so that it holds the TIME_WAIT state and not the
// client (as this would prevent it from re-opening the connection for 2 // client (as this would prevent it from re-opening the connection for 2
// maximum segment lifetimes (2MSL), while there is no corresponding // maximum segment lifetimes (2MSL), while there is no corresponding
// server impact as a TIME_WAIT connection is immediately reopened upon // server impact as a TIME_WAIT connection is immediately reopened upon
// a new SYN with a higher seq number). (RFC 6455) // a new SYN with a higher seq number). (RFC 6455)
match self.role { Err(Error::ConnectionClosed)
Role::Client => Ok(()),
Role::Server => Err(Error::ConnectionClosed(frame.take())),
}
} else { } else {
Ok(()) Ok(())
} }
@ -449,9 +452,10 @@ impl WebSocketContext {
} // match opcode } // match opcode
} else { } else {
// Connection closed by peer
match replace(&mut self.state, WebSocketState::Terminated) { match replace(&mut self.state, WebSocketState::Terminated) {
WebSocketState::CloseAcknowledged(close) | WebSocketState::ClosedByPeer(close) => { WebSocketState::ClosedByPeer | WebSocketState::CloseAcknowledged => {
Ok(Some(Message::Close(close))) Err(Error::ConnectionClosed)
} }
_ => { _ => {
Err(Error::Protocol("Connection reset without closing handshake".into())) Err(Error::Protocol("Connection reset without closing handshake".into()))
@ -461,13 +465,12 @@ impl WebSocketContext {
} }
/// Received a close frame. Tells if we need to return a close frame to the user. /// Received a close frame. Tells if we need to return a close frame to the user.
fn do_close(&mut self, close: Option<CloseFrame>) -> Option<Option<CloseFrame<'static>>> { fn do_close<'t>(&mut self, close: Option<CloseFrame<'t>>) -> Option<Option<CloseFrame<'t>>> {
debug!("Received close frame: {:?}", close); debug!("Received close frame: {:?}", close);
match self.state { match self.state {
WebSocketState::Active => { WebSocketState::Active => {
let close_code = close.as_ref().map(|f| f.code); let close_code = close.as_ref().map(|f| f.code);
let close = close.map(CloseFrame::into_owned); self.state = WebSocketState::ClosedByPeer;
self.state = WebSocketState::ClosedByPeer(close.clone());
let reply = if let Some(code) = close_code { let reply = if let Some(code) = close_code {
if code.is_allowed() { if code.is_allowed() {
Frame::close(Some(CloseFrame { Frame::close(Some(CloseFrame {
@ -488,24 +491,14 @@ impl WebSocketContext {
Some(close) Some(close)
} }
WebSocketState::ClosedByPeer(_) | WebSocketState::CloseAcknowledged(_) => { WebSocketState::ClosedByPeer | WebSocketState::CloseAcknowledged => {
// It is already closed, just ignore. // It is already closed, just ignore.
None None
} }
WebSocketState::ClosedByUs => { WebSocketState::ClosedByUs => {
// We received a reply. // We received a reply.
let close = close.map(CloseFrame::into_owned); self.state = WebSocketState::CloseAcknowledged;
match self.role { Some(close)
Role::Client => {
// Client waits for the server to close the connection.
self.state = WebSocketState::CloseAcknowledged(close);
None
}
Role::Server => {
// Server closes the connection.
Some(close)
}
}
} }
WebSocketState::Terminated => unreachable!(), WebSocketState::Terminated => unreachable!(),
} }
@ -525,7 +518,20 @@ impl WebSocketContext {
frame.set_random_mask(); frame.set_random_mask();
} }
} }
self.frame.write_frame(stream, frame)
let res = self.frame.write_frame(stream, frame);
// An expected "Connection reset by peer" is not fatal
match res {
Err(Error::Io(err)) => Err({
match self.state {
WebSocketState::ClosedByPeer | WebSocketState::CloseAcknowledged
if err.kind() == IoErrorKind::ConnectionReset =>
Error::ConnectionClosed,
_ => Error::Io(err),
}
}),
x => x,
}
} }
} }
@ -538,9 +544,9 @@ enum WebSocketState {
/// We initiated a close handshake. /// We initiated a close handshake.
ClosedByUs, ClosedByUs,
/// The peer initiated a close handshake. /// The peer initiated a close handshake.
ClosedByPeer(Option<CloseFrame<'static>>), ClosedByPeer,
/// The peer replied to our close handshake. /// The peer replied to our close handshake.
CloseAcknowledged(Option<CloseFrame<'static>>), CloseAcknowledged,
/// The connection does not exist anymore. /// The connection does not exist anymore.
Terminated, Terminated,
} }
@ -553,6 +559,17 @@ impl WebSocketState {
_ => false, _ => false,
} }
} }
/// Check if the state is active, return error if not.
fn check_active(&self) -> Result<()> {
match self {
WebSocketState::ClosedByPeer | WebSocketState::CloseAcknowledged
=> Err(Error::ConnectionClosed),
WebSocketState::Terminated
=> Err(Error::AlreadyClosed),
_ => Ok(()),
}
}
} }
#[cfg(test)] #[cfg(test)]

Loading…
Cancel
Save