Merge branch 'devel'

pull/12/head
Alexey Galakhov 8 years ago
commit 4e10065975
  1. 4
      Cargo.toml
  2. 4
      examples/autobahn-client.rs
  3. 2
      examples/client.rs
  4. 2
      src/client.rs
  5. 14
      src/error.rs
  6. 17
      src/handshake/client.rs
  7. 17
      src/protocol/frame/coding.rs
  8. 34
      src/protocol/frame/frame.rs
  9. 1
      src/protocol/frame/mod.rs
  10. 81
      src/protocol/mod.rs

@ -7,9 +7,9 @@ authors = ["Alexey Galakhov"]
license = "MIT" license = "MIT"
readme = "README.md" readme = "README.md"
homepage = "https://github.com/snapview/tungstenite-rs" homepage = "https://github.com/snapview/tungstenite-rs"
documentation = "https://docs.rs/tungstenite/0.1.1" documentation = "https://docs.rs/tungstenite/0.2.0"
repository = "https://github.com/snapview/tungstenite-rs" repository = "https://github.com/snapview/tungstenite-rs"
version = "0.1.2" version = "0.2.0"
[features] [features]
default = ["tls"] default = ["tls"]

@ -14,7 +14,7 @@ fn get_case_count() -> Result<u32> {
Url::parse("ws://localhost:9001/getCaseCount").unwrap() Url::parse("ws://localhost:9001/getCaseCount").unwrap()
)?; )?;
let msg = socket.read_message()?; let msg = socket.read_message()?;
socket.close()?; socket.close(None)?;
Ok(msg.into_text()?.parse::<u32>().unwrap()) Ok(msg.into_text()?.parse::<u32>().unwrap())
} }
@ -22,7 +22,7 @@ fn update_reports() -> Result<()> {
let mut socket = connect( let mut socket = connect(
Url::parse(&format!("ws://localhost:9001/updateReports?agent={}", AGENT)).unwrap() Url::parse(&format!("ws://localhost:9001/updateReports?agent={}", AGENT)).unwrap()
)?; )?;
socket.close()?; socket.close(None)?;
Ok(()) Ok(())
} }

@ -16,6 +16,6 @@ fn main() {
let msg = socket.read_message().expect("Error reading message"); let msg = socket.read_message().expect("Error reading message");
println!("Received: {}", msg); println!("Received: {}", msg);
} }
// socket.close(); // socket.close(None);
} }

@ -119,6 +119,6 @@ pub fn url_mode(url: &Url) -> Result<Mode> {
pub fn client<Stream: Read + Write>(url: Url, stream: Stream) pub fn client<Stream: Read + Write>(url: Url, stream: Stream)
-> StdResult<WebSocket<Stream>, HandshakeError<Stream, ClientHandshake>> -> StdResult<WebSocket<Stream>, HandshakeError<Stream, ClientHandshake>>
{ {
let request = Request { url: url }; let request = Request { url: url, extra_headers: None };
ClientHandshake::start(stream, request).handshake() ClientHandshake::start(stream, request).handshake()
} }

@ -11,6 +11,8 @@ use std::string;
use httparse; use httparse;
use protocol::frame::CloseFrame;
#[cfg(feature="tls")] #[cfg(feature="tls")]
pub mod tls { pub mod tls {
pub use native_tls::Error; pub use native_tls::Error;
@ -22,7 +24,7 @@ pub type Result<T> = result::Result<T, Error>;
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
/// WebSocket connection closed (normally) /// WebSocket connection closed (normally)
ConnectionClosed, ConnectionClosed(Option<CloseFrame<'static>>),
/// Input-output error /// Input-output error
Io(io::Error), Io(io::Error),
#[cfg(feature="tls")] #[cfg(feature="tls")]
@ -43,7 +45,13 @@ pub enum Error {
impl fmt::Display for Error { impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self { match *self {
Error::ConnectionClosed => write!(f, "Connection closed"), Error::ConnectionClosed(ref frame) => {
if let Some(ref cf) = *frame {
write!(f, "Connection closed: {}", cf)
} else {
write!(f, "Connection closed (empty close frame)")
}
}
Error::Io(ref err) => write!(f, "IO error: {}", err), Error::Io(ref err) => write!(f, "IO error: {}", err),
#[cfg(feature="tls")] #[cfg(feature="tls")]
Error::Tls(ref err) => write!(f, "TLS error: {}", err), Error::Tls(ref err) => write!(f, "TLS error: {}", err),
@ -59,7 +67,7 @@ impl fmt::Display for Error {
impl ErrorTrait for Error { impl ErrorTrait for Error {
fn description(&self) -> &str { fn description(&self) -> &str {
match *self { match *self {
Error::ConnectionClosed => "", Error::ConnectionClosed(_) => "A close handshake is performed",
Error::Io(ref err) => err.description(), Error::Io(ref err) => err.description(),
#[cfg(feature="tls")] #[cfg(feature="tls")]
Error::Tls(ref err) => err.description(), Error::Tls(ref err) => err.description(),

@ -13,12 +13,12 @@ use super::{MidHandshake, HandshakeRole, ProcessingResult, convert_key};
use super::machine::{HandshakeMachine, StageResult, TryParse}; use super::machine::{HandshakeMachine, StageResult, TryParse};
/// Client request. /// Client request.
pub struct Request { pub struct Request<'t> {
pub url: Url, pub url: Url,
// TODO extra headers pub extra_headers: Option<&'t [(&'t str, &'t str)]>,
} }
impl Request { impl<'t> Request<'t> {
/// The GET part of the request. /// The GET part of the request.
fn get_path(&self) -> String { fn get_path(&self) -> String {
if let Some(query) = self.url.query() { if let Some(query) = self.url.query() {
@ -56,9 +56,14 @@ impl ClientHandshake {
Connection: upgrade\r\n\ Connection: upgrade\r\n\
Upgrade: websocket\r\n\ Upgrade: websocket\r\n\
Sec-WebSocket-Version: 13\r\n\ Sec-WebSocket-Version: 13\r\n\
Sec-WebSocket-Key: {key}\r\n\ Sec-WebSocket-Key: {key}\r\n",
\r\n", host = request.get_host(), path = request.get_path(), key = key) host = request.get_host(), path = request.get_path(), key = key).unwrap();
.unwrap(); if let Some(eh) = request.extra_headers {
for &(k, v) in eh {
write!(req, "{}: {}\r\n", k, v).unwrap();
}
}
write!(req, "\r\n").unwrap();
HandshakeMachine::start_write(stream, req) HandshakeMachine::start_write(stream, req)
}; };

@ -193,9 +193,16 @@ impl CloseCode {
} }
} }
impl Into<u16> for CloseCode { impl fmt::Display for CloseCode {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let code: u16 = self.into();
write!(f, "{}", code)
}
}
impl<'t> Into<u16> for &'t CloseCode {
fn into(self) -> u16 { fn into(self) -> u16 {
match self { match *self {
Normal => 1000, Normal => 1000,
Away => 1001, Away => 1001,
Protocol => 1002, Protocol => 1002,
@ -218,6 +225,12 @@ impl Into<u16> for CloseCode {
} }
} }
impl Into<u16> for CloseCode {
fn into(self) -> u16 {
(&self).into()
}
}
impl From<u16> for CloseCode { impl From<u16> for CloseCode {
fn from(code: u16) -> CloseCode { fn from(code: u16) -> CloseCode {
match code { match code {

@ -1,4 +1,5 @@
use std::fmt; use std::fmt;
use std::borrow::Cow;
use std::mem::transmute; use std::mem::transmute;
use std::io::{Cursor, Read, Write, ErrorKind}; use std::io::{Cursor, Read, Write, ErrorKind};
use std::default::Default; use std::default::Default;
@ -43,6 +44,31 @@ fn generate_mask() -> [u8; 4] {
rand::random() rand::random()
} }
/// A struct representing the close command.
#[derive(Debug, Clone)]
pub struct CloseFrame<'t> {
/// The reason as a code.
pub code: CloseCode,
/// The reason as text string.
pub reason: Cow<'t, str>,
}
impl<'t> CloseFrame<'t> {
/// Convert into a owned string.
pub fn into_owned(self) -> CloseFrame<'static> {
CloseFrame {
code: self.code,
reason: self.reason.into_owned().into(),
}
}
}
impl<'t> fmt::Display for CloseFrame<'t> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{} ({})", self.reason, self.code)
}
}
/// A struct representing a WebSocket frame. /// A struct representing a WebSocket frame.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Frame { pub struct Frame {
@ -215,7 +241,7 @@ impl Frame {
/// Consume the frame into a closing frame. /// Consume the frame into a closing frame.
#[inline] #[inline]
pub fn into_close(self) -> Result<Option<(CloseCode, String)>> { pub fn into_close(self) -> Result<Option<CloseFrame<'static>>> {
match self.payload.len() { match self.payload.len() {
0 => Ok(None), 0 => Ok(None),
1 => Err(Error::Protocol("Invalid close sequence".into())), 1 => Err(Error::Protocol("Invalid close sequence".into())),
@ -224,7 +250,7 @@ impl Frame {
let code = NetworkEndian::read_u16(&data[0..2]).into(); let code = NetworkEndian::read_u16(&data[0..2]).into();
data.drain(0..2); data.drain(0..2);
let text = String::from_utf8(data)?; let text = String::from_utf8(data)?;
Ok(Some((code, text))) Ok(Some(CloseFrame { code: code, reason: text.into() }))
} }
} }
} }
@ -267,8 +293,8 @@ impl Frame {
/// Create a new Close control frame. /// Create a new Close control frame.
#[inline] #[inline]
pub fn close(msg: Option<(CloseCode, &str)>) -> Frame { pub fn close(msg: Option<CloseFrame>) -> Frame {
let payload = if let Some((code, reason)) = msg { let payload = if let Some(CloseFrame { code, reason }) = msg {
let raw: [u8; 2] = unsafe { let raw: [u8; 2] = unsafe {
let u: u16 = code.into(); let u: u16 = code.into();
transmute(u.to_be()) transmute(u.to_be())

@ -5,6 +5,7 @@ pub mod coding;
mod frame; mod frame;
pub use self::frame::Frame; pub use self::frame::Frame;
pub use self::frame::CloseFrame;
use std::io::{Read, Write}; use std::io::{Read, Write};

@ -5,9 +5,10 @@ pub mod frame;
mod message; mod message;
pub use self::message::Message; pub use self::message::Message;
pub use self::frame::CloseFrame;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::io::{Read, Write}; use std::io::{Read, Write, ErrorKind as IoErrorKind};
use std::mem::replace; use std::mem::replace;
use error::{Error, Result}; use error::{Error, Result};
@ -89,7 +90,8 @@ impl<Stream: Read + Write> WebSocket<Stream> {
self.write_pending().no_block()?; self.write_pending().no_block()?;
// If we get here, either write blocks or we have nothing to write. // If we get here, either write blocks or we have nothing to write.
// Thus if read blocks, just let it return WouldBlock. // Thus if read blocks, just let it return WouldBlock.
if let Some(message) = self.read_message_frame()? { let res = self.read_message_frame();
if let Some(message) = self.translate_close(res)? {
trace!("Received message {}", message); trace!("Received message {}", message);
return Ok(message) return Ok(message)
} }
@ -123,10 +125,10 @@ impl<Stream: Read + Write> WebSocket<Stream> {
/// ///
/// This function guarantees that the close frame will be queued. /// This function guarantees that the close frame will be queued.
/// There is no need to call it again, just like write_message(). /// There is no need to call it again, just like write_message().
pub fn close(&mut self) -> Result<()> { pub fn close(&mut self, code: Option<CloseFrame>) -> Result<()> {
if let WebSocketState::Active = self.state { if let WebSocketState::Active = self.state {
self.state = WebSocketState::ClosedByUs; self.state = WebSocketState::ClosedByUs;
let frame = Frame::close(None); let frame = Frame::close(code);
self.send_queue.push_back(frame); self.send_queue.push_back(frame);
} else { } else {
// Already closed, nothing to do. // Already closed, nothing to do.
@ -137,7 +139,10 @@ impl<Stream: Read + Write> WebSocket<Stream> {
/// Flush the pending send queue. /// Flush the pending send queue.
pub fn write_pending(&mut self) -> Result<()> { pub fn write_pending(&mut self) -> Result<()> {
// First, make sure we have no pending frame sending. // First, make sure we have no pending frame sending.
self.socket.write_pending()?; {
let res = self.socket.write_pending();
self.translate_close(res)?;
}
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
// response, unless it already received a Close frame. It SHOULD // response, unless it already received a Close frame. It SHOULD
@ -151,8 +156,8 @@ impl<Stream: Read + Write> WebSocket<Stream> {
} }
// If we're closing and there is nothing to send anymore, we should close the connection. // If we're closing and there is nothing to send anymore, we should close the connection.
match self.state { if self.send_queue.is_empty() {
WebSocketState::ClosedByPeer if self.send_queue.is_empty() => { if let WebSocketState::ClosedByPeer(ref mut frame) = self.state {
// The underlying TCP connection, in most normal cases, SHOULD be closed // The underlying TCP connection, in most normal cases, SHOULD be closed
// first by the server, so that it holds the TIME_WAIT state and not the // first by the server, so that it holds the TIME_WAIT state and not the
// client (as this would prevent it from re-opening the connection for 2 // client (as this would prevent it from re-opening the connection for 2
@ -161,10 +166,13 @@ impl<Stream: Read + Write> WebSocket<Stream> {
// a new SYN with a higher seq number). (RFC 6455) // a new SYN with a higher seq number). (RFC 6455)
match self.role { match self.role {
Role::Client => Ok(()), Role::Client => Ok(()),
Role::Server => Err(Error::ConnectionClosed), Role::Server => Err(Error::ConnectionClosed(replace(frame, None))),
} }
} else {
Ok(())
} }
_ => Ok(()), } else {
Ok(())
} }
} }
@ -290,36 +298,47 @@ impl<Stream: Read + Write> WebSocket<Stream> {
} }
/// Received a close frame. /// Received a close frame.
fn do_close(&mut self, close: Option<(CloseCode, String)>) -> Result<()> { fn do_close(&mut self, close: Option<CloseFrame>) -> Result<()> {
debug!("Received close frame: {:?}", close);
match self.state { match self.state {
WebSocketState::Active => { WebSocketState::Active => {
self.state = WebSocketState::ClosedByPeer; let close_code = close.as_ref().map(|f| f.code);
let reply = if let Some((code, _)) = close { self.state = WebSocketState::ClosedByPeer(close.map(CloseFrame::into_owned));
let reply = if let Some(code) = close_code {
if code.is_allowed() { if code.is_allowed() {
Frame::close(Some((CloseCode::Normal, ""))) Frame::close(Some(CloseFrame {
code: CloseCode::Normal,
reason: "".into(),
}))
} else { } else {
Frame::close(Some((CloseCode::Protocol, "Protocol violation"))) Frame::close(Some(CloseFrame {
code: CloseCode::Protocol,
reason: "Protocol violation".into()
}))
} }
} else { } else {
Frame::close(None) Frame::close(None)
}; };
debug!("Replying to close with {:?}", reply);
self.send_queue.push_back(reply); self.send_queue.push_back(reply);
Ok(()) Ok(())
} }
WebSocketState::ClosedByPeer => { WebSocketState::ClosedByPeer(_) | WebSocketState::CloseAcknowledged(_) => {
// It is already closed, just ignore. // It is already closed, just ignore.
Ok(()) Ok(())
} }
WebSocketState::ClosedByUs => { WebSocketState::ClosedByUs => {
// We received a reply. // We received a reply.
let close = close.map(CloseFrame::into_owned);
match self.role { match self.role {
Role::Client => { Role::Client => {
// Client waits for the server to close the connection. // Client waits for the server to close the connection.
self.state = WebSocketState::CloseAcknowledged(close);
Ok(()) Ok(())
} }
Role::Server => { Role::Server => {
// Server closes the connection. // Server closes the connection.
Err(Error::ConnectionClosed) Err(Error::ConnectionClosed(close))
} }
} }
} }
@ -358,16 +377,42 @@ impl<Stream: Read + Write> WebSocket<Stream> {
frame.set_mask(); frame.set_mask();
} }
} }
self.socket.write_frame(frame) let res = self.socket.write_frame(frame);
self.translate_close(res)
}
/// Translate a "Connection reset by peer" into ConnectionClosed as needed.
fn translate_close<T>(&mut self, res: Result<T>) -> Result<T> {
match res {
Err(Error::Io(err)) => Err({
if err.kind() == IoErrorKind::ConnectionReset {
match self.state {
WebSocketState::ClosedByPeer(ref mut frame) =>
Error::ConnectionClosed(replace(frame, None)),
WebSocketState::CloseAcknowledged(ref mut frame) =>
Error::ConnectionClosed(replace(frame, None)),
_ => Error::Io(err),
}
} else {
Error::Io(err)
}
}),
x => x,
}
} }
} }
/// The current connection state. /// The current connection state.
enum WebSocketState { enum WebSocketState {
/// The connection is active.
Active, Active,
/// We initiated a close handshake.
ClosedByUs, ClosedByUs,
ClosedByPeer, /// The peer initiated a close handshake.
ClosedByPeer(Option<CloseFrame<'static>>),
/// The peer replied to our close handshake.
CloseAcknowledged(Option<CloseFrame<'static>>),
} }
impl WebSocketState { impl WebSocketState {

Loading…
Cancel
Save