Merge branch 'devel'

pull/12/head
Alexey Galakhov 8 years ago
commit 9a3b29b05e
  1. 26
      Cargo.toml
  2. 4
      examples/autobahn-client.rs
  3. 2
      examples/client.rs
  4. 2
      src/client.rs
  5. 14
      src/error.rs
  6. 17
      src/handshake/client.rs
  7. 17
      src/protocol/frame/coding.rs
  8. 34
      src/protocol/frame/frame.rs
  9. 1
      src/protocol/frame/mod.rs
  10. 81
      src/protocol/mod.rs

@ -7,28 +7,28 @@ authors = ["Alexey Galakhov"]
license = "MIT"
readme = "README.md"
homepage = "https://github.com/snapview/tungstenite-rs"
documentation = "https://docs.rs/tungstenite/0.1.1"
documentation = "https://docs.rs/tungstenite/"
repository = "https://github.com/snapview/tungstenite-rs"
version = "0.1.2"
version = "0.2.0"
[features]
default = ["tls"]
tls = ["native-tls"]
[dependencies]
base64 = "0.4.0"
byteorder = "1.0.0"
bytes = "0.4.1"
httparse = "1.2.1"
log = "0.3.7"
rand = "0.3.15"
sha1 = "0.2.0"
url = "1.4.0"
utf-8 = "0.7.0"
base64 = "*"
byteorder = "*"
bytes = "*"
httparse = "*"
log = "*"
rand = "*"
sha1 = "*"
url = "*"
utf-8 = "*"
[dependencies.native-tls]
optional = true
version = "0.1.1"
version = "*"
[dev-dependencies]
env_logger = "0.4.2"
env_logger = "*"

@ -14,7 +14,7 @@ fn get_case_count() -> Result<u32> {
Url::parse("ws://localhost:9001/getCaseCount").unwrap()
)?;
let msg = socket.read_message()?;
socket.close()?;
socket.close(None)?;
Ok(msg.into_text()?.parse::<u32>().unwrap())
}
@ -22,7 +22,7 @@ fn update_reports() -> Result<()> {
let mut socket = connect(
Url::parse(&format!("ws://localhost:9001/updateReports?agent={}", AGENT)).unwrap()
)?;
socket.close()?;
socket.close(None)?;
Ok(())
}

@ -16,6 +16,6 @@ fn main() {
let msg = socket.read_message().expect("Error reading message");
println!("Received: {}", msg);
}
// socket.close();
// socket.close(None);
}

@ -119,6 +119,6 @@ pub fn url_mode(url: &Url) -> Result<Mode> {
pub fn client<Stream: Read + Write>(url: Url, stream: Stream)
-> StdResult<WebSocket<Stream>, HandshakeError<Stream, ClientHandshake>>
{
let request = Request { url: url };
let request = Request { url: url, extra_headers: None };
ClientHandshake::start(stream, request).handshake()
}

@ -11,6 +11,8 @@ use std::string;
use httparse;
use protocol::frame::CloseFrame;
#[cfg(feature="tls")]
pub mod tls {
pub use native_tls::Error;
@ -22,7 +24,7 @@ pub type Result<T> = result::Result<T, Error>;
#[derive(Debug)]
pub enum Error {
/// WebSocket connection closed (normally)
ConnectionClosed,
ConnectionClosed(Option<CloseFrame<'static>>),
/// Input-output error
Io(io::Error),
#[cfg(feature="tls")]
@ -43,7 +45,13 @@ pub enum Error {
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::ConnectionClosed => write!(f, "Connection closed"),
Error::ConnectionClosed(ref frame) => {
if let Some(ref cf) = *frame {
write!(f, "Connection closed: {}", cf)
} else {
write!(f, "Connection closed (empty close frame)")
}
}
Error::Io(ref err) => write!(f, "IO error: {}", err),
#[cfg(feature="tls")]
Error::Tls(ref err) => write!(f, "TLS error: {}", err),
@ -59,7 +67,7 @@ impl fmt::Display for Error {
impl ErrorTrait for Error {
fn description(&self) -> &str {
match *self {
Error::ConnectionClosed => "",
Error::ConnectionClosed(_) => "A close handshake is performed",
Error::Io(ref err) => err.description(),
#[cfg(feature="tls")]
Error::Tls(ref err) => err.description(),

@ -13,12 +13,12 @@ use super::{MidHandshake, HandshakeRole, ProcessingResult, convert_key};
use super::machine::{HandshakeMachine, StageResult, TryParse};
/// Client request.
pub struct Request {
pub struct Request<'t> {
pub url: Url,
// TODO extra headers
pub extra_headers: Option<&'t [(&'t str, &'t str)]>,
}
impl Request {
impl<'t> Request<'t> {
/// The GET part of the request.
fn get_path(&self) -> String {
if let Some(query) = self.url.query() {
@ -56,9 +56,14 @@ impl ClientHandshake {
Connection: upgrade\r\n\
Upgrade: websocket\r\n\
Sec-WebSocket-Version: 13\r\n\
Sec-WebSocket-Key: {key}\r\n\
\r\n", host = request.get_host(), path = request.get_path(), key = key)
.unwrap();
Sec-WebSocket-Key: {key}\r\n",
host = request.get_host(), path = request.get_path(), key = key).unwrap();
if let Some(eh) = request.extra_headers {
for &(k, v) in eh {
write!(req, "{}: {}\r\n", k, v).unwrap();
}
}
write!(req, "\r\n").unwrap();
HandshakeMachine::start_write(stream, req)
};

@ -193,9 +193,16 @@ impl CloseCode {
}
}
impl Into<u16> for CloseCode {
impl fmt::Display for CloseCode {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let code: u16 = self.into();
write!(f, "{}", code)
}
}
impl<'t> Into<u16> for &'t CloseCode {
fn into(self) -> u16 {
match self {
match *self {
Normal => 1000,
Away => 1001,
Protocol => 1002,
@ -218,6 +225,12 @@ impl Into<u16> for CloseCode {
}
}
impl Into<u16> for CloseCode {
fn into(self) -> u16 {
(&self).into()
}
}
impl From<u16> for CloseCode {
fn from(code: u16) -> CloseCode {
match code {

@ -1,4 +1,5 @@
use std::fmt;
use std::borrow::Cow;
use std::mem::transmute;
use std::io::{Cursor, Read, Write, ErrorKind};
use std::default::Default;
@ -43,6 +44,31 @@ fn generate_mask() -> [u8; 4] {
rand::random()
}
/// A struct representing the close command.
#[derive(Debug, Clone)]
pub struct CloseFrame<'t> {
/// The reason as a code.
pub code: CloseCode,
/// The reason as text string.
pub reason: Cow<'t, str>,
}
impl<'t> CloseFrame<'t> {
/// Convert into a owned string.
pub fn into_owned(self) -> CloseFrame<'static> {
CloseFrame {
code: self.code,
reason: self.reason.into_owned().into(),
}
}
}
impl<'t> fmt::Display for CloseFrame<'t> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{} ({})", self.reason, self.code)
}
}
/// A struct representing a WebSocket frame.
#[derive(Debug, Clone)]
pub struct Frame {
@ -215,7 +241,7 @@ impl Frame {
/// Consume the frame into a closing frame.
#[inline]
pub fn into_close(self) -> Result<Option<(CloseCode, String)>> {
pub fn into_close(self) -> Result<Option<CloseFrame<'static>>> {
match self.payload.len() {
0 => Ok(None),
1 => Err(Error::Protocol("Invalid close sequence".into())),
@ -224,7 +250,7 @@ impl Frame {
let code = NetworkEndian::read_u16(&data[0..2]).into();
data.drain(0..2);
let text = String::from_utf8(data)?;
Ok(Some((code, text)))
Ok(Some(CloseFrame { code: code, reason: text.into() }))
}
}
}
@ -267,8 +293,8 @@ impl Frame {
/// Create a new Close control frame.
#[inline]
pub fn close(msg: Option<(CloseCode, &str)>) -> Frame {
let payload = if let Some((code, reason)) = msg {
pub fn close(msg: Option<CloseFrame>) -> Frame {
let payload = if let Some(CloseFrame { code, reason }) = msg {
let raw: [u8; 2] = unsafe {
let u: u16 = code.into();
transmute(u.to_be())

@ -5,6 +5,7 @@ pub mod coding;
mod frame;
pub use self::frame::Frame;
pub use self::frame::CloseFrame;
use std::io::{Read, Write};

@ -5,9 +5,10 @@ pub mod frame;
mod message;
pub use self::message::Message;
pub use self::frame::CloseFrame;
use std::collections::VecDeque;
use std::io::{Read, Write};
use std::io::{Read, Write, ErrorKind as IoErrorKind};
use std::mem::replace;
use error::{Error, Result};
@ -89,7 +90,8 @@ impl<Stream: Read + Write> WebSocket<Stream> {
self.write_pending().no_block()?;
// If we get here, either write blocks or we have nothing to write.
// Thus if read blocks, just let it return WouldBlock.
if let Some(message) = self.read_message_frame()? {
let res = self.read_message_frame();
if let Some(message) = self.translate_close(res)? {
trace!("Received message {}", message);
return Ok(message)
}
@ -123,10 +125,10 @@ impl<Stream: Read + Write> WebSocket<Stream> {
///
/// This function guarantees that the close frame will be queued.
/// There is no need to call it again, just like write_message().
pub fn close(&mut self) -> Result<()> {
pub fn close(&mut self, code: Option<CloseFrame>) -> Result<()> {
if let WebSocketState::Active = self.state {
self.state = WebSocketState::ClosedByUs;
let frame = Frame::close(None);
let frame = Frame::close(code);
self.send_queue.push_back(frame);
} else {
// Already closed, nothing to do.
@ -137,7 +139,10 @@ impl<Stream: Read + Write> WebSocket<Stream> {
/// Flush the pending send queue.
pub fn write_pending(&mut self) -> Result<()> {
// First, make sure we have no pending frame sending.
self.socket.write_pending()?;
{
let res = self.socket.write_pending();
self.translate_close(res)?;
}
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
// response, unless it already received a Close frame. It SHOULD
@ -151,8 +156,8 @@ impl<Stream: Read + Write> WebSocket<Stream> {
}
// If we're closing and there is nothing to send anymore, we should close the connection.
match self.state {
WebSocketState::ClosedByPeer if self.send_queue.is_empty() => {
if self.send_queue.is_empty() {
if let WebSocketState::ClosedByPeer(ref mut frame) = self.state {
// The underlying TCP connection, in most normal cases, SHOULD be closed
// first by the server, so that it holds the TIME_WAIT state and not the
// client (as this would prevent it from re-opening the connection for 2
@ -161,10 +166,13 @@ impl<Stream: Read + Write> WebSocket<Stream> {
// a new SYN with a higher seq number). (RFC 6455)
match self.role {
Role::Client => Ok(()),
Role::Server => Err(Error::ConnectionClosed),
Role::Server => Err(Error::ConnectionClosed(replace(frame, None))),
}
} else {
Ok(())
}
_ => Ok(()),
} else {
Ok(())
}
}
@ -290,36 +298,47 @@ impl<Stream: Read + Write> WebSocket<Stream> {
}
/// Received a close frame.
fn do_close(&mut self, close: Option<(CloseCode, String)>) -> Result<()> {
fn do_close(&mut self, close: Option<CloseFrame>) -> Result<()> {
debug!("Received close frame: {:?}", close);
match self.state {
WebSocketState::Active => {
self.state = WebSocketState::ClosedByPeer;
let reply = if let Some((code, _)) = close {
let close_code = close.as_ref().map(|f| f.code);
self.state = WebSocketState::ClosedByPeer(close.map(CloseFrame::into_owned));
let reply = if let Some(code) = close_code {
if code.is_allowed() {
Frame::close(Some((CloseCode::Normal, "")))
Frame::close(Some(CloseFrame {
code: CloseCode::Normal,
reason: "".into(),
}))
} else {
Frame::close(Some((CloseCode::Protocol, "Protocol violation")))
Frame::close(Some(CloseFrame {
code: CloseCode::Protocol,
reason: "Protocol violation".into()
}))
}
} else {
Frame::close(None)
};
debug!("Replying to close with {:?}", reply);
self.send_queue.push_back(reply);
Ok(())
}
WebSocketState::ClosedByPeer => {
WebSocketState::ClosedByPeer(_) | WebSocketState::CloseAcknowledged(_) => {
// It is already closed, just ignore.
Ok(())
}
WebSocketState::ClosedByUs => {
// We received a reply.
let close = close.map(CloseFrame::into_owned);
match self.role {
Role::Client => {
// Client waits for the server to close the connection.
self.state = WebSocketState::CloseAcknowledged(close);
Ok(())
}
Role::Server => {
// Server closes the connection.
Err(Error::ConnectionClosed)
Err(Error::ConnectionClosed(close))
}
}
}
@ -358,16 +377,42 @@ impl<Stream: Read + Write> WebSocket<Stream> {
frame.set_mask();
}
}
self.socket.write_frame(frame)
let res = self.socket.write_frame(frame);
self.translate_close(res)
}
/// Translate a "Connection reset by peer" into ConnectionClosed as needed.
fn translate_close<T>(&mut self, res: Result<T>) -> Result<T> {
match res {
Err(Error::Io(err)) => Err({
if err.kind() == IoErrorKind::ConnectionReset {
match self.state {
WebSocketState::ClosedByPeer(ref mut frame) =>
Error::ConnectionClosed(replace(frame, None)),
WebSocketState::CloseAcknowledged(ref mut frame) =>
Error::ConnectionClosed(replace(frame, None)),
_ => Error::Io(err),
}
} else {
Error::Io(err)
}
}),
x => x,
}
}
}
/// The current connection state.
enum WebSocketState {
/// The connection is active.
Active,
/// We initiated a close handshake.
ClosedByUs,
ClosedByPeer,
/// The peer initiated a close handshake.
ClosedByPeer(Option<CloseFrame<'static>>),
/// The peer replied to our close handshake.
CloseAcknowledged(Option<CloseFrame<'static>>),
}
impl WebSocketState {

Loading…
Cancel
Save