Merge pull request #17 from alexcrichton/ctl-pong

Add `Ping` and `Pong` variants to `Message`
pull/18/head
Daniel Abramov 8 years ago committed by GitHub
commit 7019210010
  1. 10
      examples/autobahn-client.rs
  2. 10
      examples/autobahn-server.rs
  3. 49
      src/protocol/message.rs
  4. 66
      src/protocol/mod.rs

@ -5,7 +5,7 @@ extern crate url;
use url::Url; use url::Url;
use tungstenite::{connect, Error, Result}; use tungstenite::{connect, Error, Result, Message};
const AGENT: &'static str = "Tungstenite"; const AGENT: &'static str = "Tungstenite";
@ -33,9 +33,15 @@ fn run_test(case: u32) -> Result<()> {
).unwrap(); ).unwrap();
let mut socket = connect(case_url)?; let mut socket = connect(case_url)?;
loop { loop {
let msg = socket.read_message()?; match socket.read_message()? {
msg @ Message::Text(_) |
msg @ Message::Binary(_) => {
socket.write_message(msg)?; socket.write_message(msg)?;
} }
Message::Ping(_) |
Message::Pong(_) => {}
}
}
} }
fn main() { fn main() {

@ -5,7 +5,7 @@ extern crate tungstenite;
use std::net::{TcpListener, TcpStream}; use std::net::{TcpListener, TcpStream};
use std::thread::spawn; use std::thread::spawn;
use tungstenite::{accept, HandshakeError, Error, Result}; use tungstenite::{accept, HandshakeError, Error, Result, Message};
fn must_not_block<Stream, Role>(err: HandshakeError<Stream, Role>) -> Error { fn must_not_block<Stream, Role>(err: HandshakeError<Stream, Role>) -> Error {
match err { match err {
@ -17,9 +17,15 @@ fn must_not_block<Stream, Role>(err: HandshakeError<Stream, Role>) -> Error {
fn handle_client(stream: TcpStream) -> Result<()> { fn handle_client(stream: TcpStream) -> Result<()> {
let mut socket = accept(stream).map_err(must_not_block)?; let mut socket = accept(stream).map_err(must_not_block)?;
loop { loop {
let msg = socket.read_message()?; match socket.read_message()? {
msg @ Message::Text(_) |
msg @ Message::Binary(_) => {
socket.write_message(msg)?; socket.write_message(msg)?;
} }
Message::Ping(_) |
Message::Pong(_) => {}
}
}
} }
fn main() { fn main() {

@ -141,6 +141,14 @@ pub enum Message {
Text(String), Text(String),
/// A binary WebSocket message /// A binary WebSocket message
Binary(Vec<u8>), Binary(Vec<u8>),
/// A ping message with the specified payload
///
/// The payload here must have a length less than 125 bytes
Ping(Vec<u8>),
/// A pong message with the specified payload
///
/// The payload here must have a length less than 125 bytes
Pong(Vec<u8>),
} }
impl Message { impl Message {
@ -163,15 +171,31 @@ impl Message {
pub fn is_text(&self) -> bool { pub fn is_text(&self) -> bool {
match *self { match *self {
Message::Text(_) => true, Message::Text(_) => true,
Message::Binary(_) => false, _ => false,
} }
} }
/// Indicates whether a message is a binary message. /// Indicates whether a message is a binary message.
pub fn is_binary(&self) -> bool { pub fn is_binary(&self) -> bool {
match *self { match *self {
Message::Text(_) => false,
Message::Binary(_) => true, Message::Binary(_) => true,
_ => false,
}
}
/// Indicates whether a message is a ping message.
pub fn is_ping(&self) -> bool {
match *self {
Message::Ping(_) => true,
_ => false,
}
}
/// Indicates whether a message is a pong message.
pub fn is_pong(&self) -> bool {
match *self {
Message::Pong(_) => true,
_ => false,
} }
} }
@ -179,24 +203,25 @@ impl Message {
pub fn len(&self) -> usize { pub fn len(&self) -> usize {
match *self { match *self {
Message::Text(ref string) => string.len(), Message::Text(ref string) => string.len(),
Message::Binary(ref data) => data.len(), Message::Binary(ref data) |
Message::Ping(ref data) |
Message::Pong(ref data) => data.len(),
} }
} }
/// Returns true if the WebSocket message has no content. /// Returns true if the WebSocket message has no content.
/// For example, if the other side of the connection sent an empty string. /// For example, if the other side of the connection sent an empty string.
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
match *self { self.len() == 0
Message::Text(ref string) => string.is_empty(),
Message::Binary(ref data) => data.is_empty(),
}
} }
/// Consume the WebSocket and return it as binary data. /// Consume the WebSocket and return it as binary data.
pub fn into_data(self) -> Vec<u8> { pub fn into_data(self) -> Vec<u8> {
match self { match self {
Message::Text(string) => string.into_bytes(), Message::Text(string) => string.into_bytes(),
Message::Binary(data) => data, Message::Binary(data) |
Message::Ping(data) |
Message::Pong(data) => data,
} }
} }
@ -204,7 +229,9 @@ impl Message {
pub fn into_text(self) -> Result<String> { pub fn into_text(self) -> Result<String> {
match self { match self {
Message::Text(string) => Ok(string), Message::Text(string) => Ok(string),
Message::Binary(data) => Ok(try!( Message::Binary(data) |
Message::Ping(data) |
Message::Pong(data) => Ok(try!(
String::from_utf8(data).map_err(|err| err.utf8_error()))), String::from_utf8(data).map_err(|err| err.utf8_error()))),
} }
} }
@ -214,7 +241,9 @@ impl Message {
pub fn to_text(&self) -> Result<&str> { pub fn to_text(&self) -> Result<&str> {
match *self { match *self {
Message::Text(ref string) => Ok(string), Message::Text(ref string) => Ok(string),
Message::Binary(ref data) => Ok(try!(str::from_utf8(data))), Message::Binary(ref data) |
Message::Ping(ref data) |
Message::Pong(ref data) => Ok(try!(str::from_utf8(data))),
} }
} }

@ -103,24 +103,27 @@ impl<Stream: Read + Write> WebSocket<Stream> {
/// This function guarantees that the frame is queued regardless of any errors. /// This function guarantees that the frame is queued regardless of any errors.
/// There is no need to resend the frame. In order to handle WouldBlock or Incomplete, /// There is no need to resend the frame. In order to handle WouldBlock or Incomplete,
/// call write_pending() afterwards. /// call write_pending() afterwards.
///
/// Note that only the last pong frame is stored to be sent, and only the
/// most recent pong frame is sent if multiple pong frames are queued up.
pub fn write_message(&mut self, message: Message) -> Result<()> { pub fn write_message(&mut self, message: Message) -> Result<()> {
let frame = { let frame = match message {
let opcode = match message { Message::Text(data) => {
Message::Text(_) => OpData::Text, Frame::message(data.into(), OpCode::Data(OpData::Text), true)
Message::Binary(_) => OpData::Binary, }
}; Message::Binary(data) => {
Frame::message(message.into_data(), OpCode::Data(opcode), true) Frame::message(data, OpCode::Data(OpData::Binary), true)
}
Message::Ping(data) => Frame::ping(data),
Message::Pong(data) => {
self.pong = Some(Frame::pong(data));
return self.write_pending()
}
}; };
self.send_queue.push_back(frame); self.send_queue.push_back(frame);
self.write_pending() self.write_pending()
} }
/// Send ping.
pub fn send_ping(&mut self, payload: Vec<u8>) -> Result<()> {
self.send_queue.push_back(Frame::ping(payload));
self.write_pending()
}
/// Close the connection. /// Close the connection.
/// ///
/// This function guarantees that the close frame will be queued. /// This function guarantees that the close frame will be queued.
@ -212,7 +215,7 @@ impl<Stream: Read + Write> WebSocket<Stream> {
match frame.opcode() { match frame.opcode() {
OpCode::Control(ctl) => { OpCode::Control(ctl) => {
(match ctl { match ctl {
// All control frames MUST have a payload length of 125 bytes or less // All control frames MUST have a payload length of 125 bytes or less
// and MUST NOT be fragmented. (RFC 6455) // and MUST NOT be fragmented. (RFC 6455)
_ if !frame.is_final() => { _ if !frame.is_final() => {
@ -222,22 +225,24 @@ impl<Stream: Read + Write> WebSocket<Stream> {
Err(Error::Protocol("Control frame too big".into())) Err(Error::Protocol("Control frame too big".into()))
} }
OpCtl::Close => { OpCtl::Close => {
self.do_close(frame.into_close()?) self.do_close(frame.into_close()?).map(|_| None)
} }
OpCtl::Reserved(i) => { OpCtl::Reserved(i) => {
Err(Error::Protocol(format!("Unknown control frame type {}", i).into())) Err(Error::Protocol(format!("Unknown control frame type {}", i).into()))
} }
OpCtl::Ping | OpCtl::Pong if !self.state.is_active() => { OpCtl::Ping | OpCtl::Pong if !self.state.is_active() => {
// No ping processing while closing. // No ping processing while closing.
Ok(()) Ok(None)
} }
OpCtl::Ping => { OpCtl::Ping => {
self.do_ping(frame.into_data()) let data = frame.into_data();
self.pong = Some(Frame::pong(data.clone()));
Ok(Some(Message::Ping(data)))
} }
OpCtl::Pong => { OpCtl::Pong => {
self.do_pong(frame.into_data()) Ok(Some(Message::Pong(frame.into_data())))
}
} }
}).map(|_| None)
} }
OpCode::Data(_) if !self.state.is_active() => { OpCode::Data(_) if !self.state.is_active() => {
@ -353,27 +358,6 @@ impl<Stream: Read + Write> WebSocket<Stream> {
} }
} }
/// Received a ping frame.
fn do_ping(&mut self, ping: Vec<u8>) -> Result<()> {
// If an endpoint receives a Ping frame and has not yet sent Pong
// frame(s) in response to previous Ping frame(s), the endpoint MAY
// elect to send a Pong frame for only the most recently processed Ping
// frame. (RFC 6455)
// We do exactly that, keeping a "queue" from one and only Pong frame.
self.pong = Some(Frame::pong(ping));
Ok(())
}
/// Received a pong frame.
fn do_pong(&mut self, _: Vec<u8>) -> Result<()> {
// A Pong frame MAY be sent unsolicited. This serves as a
// unidirectional heartbeat. A response to an unsolicited Pong frame is
// not expected. (RFC 6455)
// Due to this, we just don't check pongs right now.
// TODO: check if there was a reply to our ping at all...
Ok(())
}
/// Send a single pending frame. /// Send a single pending frame.
fn send_one_frame(&mut self, mut frame: Frame) -> Result<()> { fn send_one_frame(&mut self, mut frame: Frame) -> Result<()> {
match self.role { match self.role {
@ -463,6 +447,8 @@ mod tests {
#[test] #[test]
fn receive_messages() { fn receive_messages() {
let incoming = Cursor::new(vec![ let incoming = Cursor::new(vec![
0x89, 0x02, 0x01, 0x02,
0x8a, 0x01, 0x03,
0x01, 0x07, 0x01, 0x07,
0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x2c, 0x20, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x2c, 0x20,
0x80, 0x06, 0x80, 0x06,
@ -471,6 +457,8 @@ mod tests {
0x01, 0x02, 0x03, 0x01, 0x02, 0x03,
]); ]);
let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client); let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client);
assert_eq!(socket.read_message().unwrap(), Message::Ping(vec![1, 2]));
assert_eq!(socket.read_message().unwrap(), Message::Pong(vec![3]));
assert_eq!(socket.read_message().unwrap(), Message::Text("Hello, World!".into())); assert_eq!(socket.read_message().unwrap(), Message::Text("Hello, World!".into()));
assert_eq!(socket.read_message().unwrap(), Message::Binary(vec![0x01, 0x02, 0x03])); assert_eq!(socket.read_message().unwrap(), Message::Binary(vec![0x01, 0x02, 0x03]));
} }

Loading…
Cancel
Save