Merge pull request #58 from ubnt-intrepid/websocket-context

extract the context values from `WebSocket`
pull/59/head
Daniel Abramov 6 years ago committed by GitHub
commit 8ed73fd28a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 116
      src/protocol/frame/mod.rs
  2. 184
      src/protocol/mod.rs

@ -18,12 +18,8 @@ use error::{Error, Result};
pub struct FrameSocket<Stream> { pub struct FrameSocket<Stream> {
/// The underlying network stream. /// The underlying network stream.
stream: Stream, stream: Stream,
/// Buffer to read data from the stream. /// Codec for reading/writing frames.
in_buffer: InputBuffer, codec: FrameCodec,
/// Buffer to send packets to the network.
out_buffer: Vec<u8>,
/// Header and remaining size of the incoming packet being processed.
header: Option<(FrameHeader, u64)>,
} }
impl<Stream> FrameSocket<Stream> { impl<Stream> FrameSocket<Stream> {
@ -31,9 +27,7 @@ impl<Stream> FrameSocket<Stream> {
pub fn new(stream: Stream) -> Self { pub fn new(stream: Stream) -> Self {
FrameSocket { FrameSocket {
stream, stream,
in_buffer: InputBuffer::with_capacity(MIN_READ), codec: FrameCodec::new(),
out_buffer: Vec::new(),
header: None,
} }
} }
@ -41,15 +35,13 @@ impl<Stream> FrameSocket<Stream> {
pub fn from_partially_read(stream: Stream, part: Vec<u8>) -> Self { pub fn from_partially_read(stream: Stream, part: Vec<u8>) -> Self {
FrameSocket { FrameSocket {
stream, stream,
in_buffer: InputBuffer::from_partially_read(part), codec: FrameCodec::from_partially_read(part),
out_buffer: Vec::new(),
header: None,
} }
} }
/// Extract a stream from the socket. /// Extract a stream from the socket.
pub fn into_inner(self) -> (Stream, Vec<u8>) { pub fn into_inner(self) -> (Stream, Vec<u8>) {
(self.stream, self.in_buffer.into_vec()) (self.stream, self.codec.in_buffer.into_vec())
} }
/// Returns a shared reference to the inner stream. /// Returns a shared reference to the inner stream.
@ -68,6 +60,67 @@ impl<Stream> FrameSocket<Stream>
{ {
/// Read a frame from stream. /// Read a frame from stream.
pub fn read_frame(&mut self, max_size: Option<usize>) -> Result<Option<Frame>> { pub fn read_frame(&mut self, max_size: Option<usize>) -> Result<Option<Frame>> {
self.codec.read_frame(&mut self.stream, max_size)
}
}
impl<Stream> FrameSocket<Stream>
where Stream: Write
{
/// Write a frame to stream.
///
/// This function guarantees that the frame is queued regardless of any errors.
/// There is no need to resend the frame. In order to handle WouldBlock or Incomplete,
/// call write_pending() afterwards.
pub fn write_frame(&mut self, frame: Frame) -> Result<()> {
self.codec.write_frame(&mut self.stream, frame)
}
/// Complete pending write, if any.
pub fn write_pending(&mut self) -> Result<()> {
self.codec.write_pending(&mut self.stream)
}
}
/// A codec for WebSocket frames.
#[derive(Debug)]
pub(super) struct FrameCodec {
/// Buffer to read data from the stream.
in_buffer: InputBuffer,
/// Buffer to send packets to the network.
out_buffer: Vec<u8>,
/// Header and remaining size of the incoming packet being processed.
header: Option<(FrameHeader, u64)>,
}
impl FrameCodec {
/// Create a new frame codec.
pub(super) fn new() -> Self {
Self {
in_buffer: InputBuffer::with_capacity(MIN_READ),
out_buffer: Vec::new(),
header: None,
}
}
/// Create a new frame codec from partially read data.
pub(super) fn from_partially_read(part: Vec<u8>) -> Self {
Self {
in_buffer: InputBuffer::from_partially_read(part),
out_buffer: Vec::new(),
header: None,
}
}
/// Read a frame from the provided stream.
pub(super) fn read_frame<Stream>(
&mut self,
stream: &mut Stream,
max_size: Option<usize>,
) -> Result<Option<Frame>>
where
Stream: Read,
{
let max_size = max_size.unwrap_or_else(usize::max_value); let max_size = max_size.unwrap_or_else(usize::max_value);
let payload = loop { let payload = loop {
@ -81,7 +134,7 @@ impl<Stream> FrameSocket<Stream>
if let Some((_, ref length)) = self.header { if let Some((_, ref length)) = self.header {
let length = *length; let length = *length;
// Enforce frame size limit early and make sure `length` // Enforce frame size limit early and make sure `length`
// is not too big (fits into `usize`). // is not too big (fits into `usize`).
if length > max_size as u64 { if length > max_size as u64 {
return Err(Error::Capacity( return Err(Error::Capacity(
@ -105,7 +158,7 @@ impl<Stream> FrameSocket<Stream>
let size = self.in_buffer.prepare_reserve(MIN_READ) let size = self.in_buffer.prepare_reserve(MIN_READ)
.with_limit(usize::max_value()) .with_limit(usize::max_value())
.map_err(|_| Error::Capacity("Incoming TCP buffer is full".into()))? .map_err(|_| Error::Capacity("Incoming TCP buffer is full".into()))?
.read_from(&mut self.stream)?; .read_from(stream)?;
if size == 0 { if size == 0 {
trace!("no frame received"); trace!("no frame received");
return Ok(None) return Ok(None)
@ -119,34 +172,35 @@ impl<Stream> FrameSocket<Stream>
Ok(Some(frame)) Ok(Some(frame))
} }
} /// Write a frame to the provided stream.
pub(super) fn write_frame<Stream>(
impl<Stream> FrameSocket<Stream> &mut self,
where Stream: Write stream: &mut Stream,
{ frame: Frame,
/// Write a frame to stream. ) -> Result<()>
/// where
/// This function guarantees that the frame is queued regardless of any errors. Stream: Write,
/// There is no need to resend the frame. In order to handle WouldBlock or Incomplete, {
/// call write_pending() afterwards.
pub fn write_frame(&mut self, frame: Frame) -> Result<()> {
trace!("writing frame {}", frame); trace!("writing frame {}", frame);
self.out_buffer.reserve(frame.len()); self.out_buffer.reserve(frame.len());
frame.format(&mut self.out_buffer).expect("Bug: can't write to vector"); frame.format(&mut self.out_buffer).expect("Bug: can't write to vector");
self.write_pending() self.write_pending(stream)
} }
/// Complete pending write, if any. /// Complete pending write, if any.
pub fn write_pending(&mut self) -> Result<()> { pub(super) fn write_pending<Stream>(&mut self, stream: &mut Stream) -> Result<()>
where
Stream: Write,
{
while !self.out_buffer.is_empty() { while !self.out_buffer.is_empty() {
let len = self.stream.write(&self.out_buffer)?; let len = stream.write(&self.out_buffer)?;
self.out_buffer.drain(0..len); self.out_buffer.drain(0..len);
} }
self.stream.flush()?; stream.flush()?;
Ok(()) Ok(())
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {

@ -13,7 +13,7 @@ use std::mem::replace;
use error::{Error, Result}; use error::{Error, Result};
use self::message::{IncompleteMessage, IncompleteMessageType}; use self::message::{IncompleteMessage, IncompleteMessageType};
use self::frame::{Frame, FrameSocket}; use self::frame::{Frame, FrameCodec};
use self::frame::coding::{OpCode, Data as OpData, Control as OpCtl, CloseCode}; use self::frame::coding::{OpCode, Data as OpData, Control as OpCtl, CloseCode};
use util::NonBlockingResult; use util::NonBlockingResult;
@ -60,20 +60,10 @@ impl Default for WebSocketConfig {
/// It may be created by calling `connect`, `accept` or `client` functions. /// It may be created by calling `connect`, `accept` or `client` functions.
#[derive(Debug)] #[derive(Debug)]
pub struct WebSocket<Stream> { pub struct WebSocket<Stream> {
/// Server or client?
role: Role,
/// The underlying socket. /// The underlying socket.
socket: FrameSocket<Stream>, socket: Stream,
/// The state of processing, either "active" or "closing". /// The context for managing a WebSocket.
state: WebSocketState, context: WebSocketContext,
/// Receive: an incomplete message being processed.
incomplete: Option<IncompleteMessage>,
/// Send: a data send queue.
send_queue: VecDeque<Frame>,
/// Send: an OOB pong message.
pong: Option<Frame>,
/// The configuration for the websocket session.
config: WebSocketConfig,
} }
impl<Stream> WebSocket<Stream> { impl<Stream> WebSocket<Stream> {
@ -83,7 +73,10 @@ impl<Stream> WebSocket<Stream> {
/// or together with an existing one. If you need an initial handshake, use /// or together with an existing one. If you need an initial handshake, use
/// `connect()` or `accept()` functions of the crate to construct a websocket. /// `connect()` or `accept()` functions of the crate to construct a websocket.
pub fn from_raw_socket(stream: Stream, role: Role, config: Option<WebSocketConfig>) -> Self { pub fn from_raw_socket(stream: Stream, role: Role, config: Option<WebSocketConfig>) -> Self {
WebSocket::from_frame_socket(FrameSocket::new(stream), role, config) WebSocket {
socket: stream,
context: WebSocketContext::new(role, config),
}
} }
/// Convert a raw socket into a WebSocket without performing a handshake. /// Convert a raw socket into a WebSocket without performing a handshake.
@ -97,34 +90,89 @@ impl<Stream> WebSocket<Stream> {
role: Role, role: Role,
config: Option<WebSocketConfig>, config: Option<WebSocketConfig>,
) -> Self { ) -> Self {
WebSocket::from_frame_socket(FrameSocket::from_partially_read(stream, part), role, config) WebSocket {
socket: stream,
context: WebSocketContext::from_partially_read(part, role, config),
}
} }
/// Returns a shared reference to the inner stream. /// Returns a shared reference to the inner stream.
pub fn get_ref(&self) -> &Stream { pub fn get_ref(&self) -> &Stream {
self.socket.get_ref() &self.socket
} }
/// Returns a mutable reference to the inner stream. /// Returns a mutable reference to the inner stream.
pub fn get_mut(&mut self) -> &mut Stream { pub fn get_mut(&mut self) -> &mut Stream {
self.socket.get_mut() &mut self.socket
} }
/// Change the configuration. /// Change the configuration.
pub fn set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig)) { pub fn set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig)) {
set_func(&mut self.config) self.context.set_config(set_func)
} }
} }
impl<Stream> WebSocket<Stream> { impl<Stream: Read + Write> WebSocket<Stream> {
/// Convert a frame socket into a WebSocket. /// Read a message from stream, if possible.
fn from_frame_socket( ///
socket: FrameSocket<Stream>, /// This function sends pong and close responses automatically.
role: Role, /// However, it never blocks on write.
config: Option<WebSocketConfig> pub fn read_message(&mut self) -> Result<Message> {
) -> Self { self.context.read_message(&mut self.socket)
WebSocket { }
/// Send a message to stream, if possible.
///
/// WebSocket will buffer a configurable number of messages at a time, except to reply to Ping
/// and Close requests. If the WebSocket's send queue is full, `SendQueueFull` will be returned
/// along with the passed message. Otherwise, the message is queued and Ok(()) is returned.
///
/// Note that only the last pong frame is stored to be sent, and only the
/// most recent pong frame is sent if multiple pong frames are queued.
pub fn write_message(&mut self, message: Message) -> Result<()> {
self.context.write_message(&mut self.socket, message)
}
/// Flush the pending send queue.
pub fn write_pending(&mut self) -> Result<()> {
self.context.write_pending(&mut self.socket)
}
/// Close the connection.
///
/// This function guarantees that the close frame will be queued.
/// There is no need to call it again. Calling this function is
/// the same as calling `write(Message::Close(..))`.
pub fn close(&mut self, code: Option<CloseFrame>) -> Result<()> {
self.context.close(&mut self.socket, code)
}
}
/// A context for managing WebSocket stream.
#[derive(Debug)]
pub struct WebSocketContext {
/// Server or client?
role: Role,
/// encoder/decoder of frame.
frame: FrameCodec,
/// The state of processing, either "active" or "closing".
state: WebSocketState,
/// Receive: an incomplete message being processed.
incomplete: Option<IncompleteMessage>,
/// Send: a data send queue.
send_queue: VecDeque<Frame>,
/// Send: an OOB pong message.
pong: Option<Frame>,
/// The configuration for the websocket session.
config: WebSocketConfig,
}
impl WebSocketContext {
/// Create a WebSocket context that manages a post-handshake stream.
pub fn new(role: Role, config: Option<WebSocketConfig>) -> Self {
WebSocketContext {
role, role,
socket, frame: FrameCodec::new(),
state: WebSocketState::Active, state: WebSocketState::Active,
incomplete: None, incomplete: None,
send_queue: VecDeque::new(), send_queue: VecDeque::new(),
@ -132,28 +180,46 @@ impl<Stream> WebSocket<Stream> {
config: config.unwrap_or_else(WebSocketConfig::default), config: config.unwrap_or_else(WebSocketConfig::default),
} }
} }
}
impl<Stream: Read + Write> WebSocket<Stream> { /// Create a WebSocket context that manages an post-handshake stream.
/// Read a message from stream, if possible. pub fn from_partially_read(
part: Vec<u8>,
role: Role,
config: Option<WebSocketConfig>,
) -> Self {
WebSocketContext {
frame: FrameCodec::from_partially_read(part),
..WebSocketContext::new(role, config)
}
}
/// Change the configuration.
pub fn set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig)) {
set_func(&mut self.config)
}
/// Read a message from the provided stream, if possible.
/// ///
/// This function sends pong and close responses automatically. /// This function sends pong and close responses automatically.
/// However, it never blocks on write. /// However, it never blocks on write.
pub fn read_message(&mut self) -> Result<Message> { pub fn read_message<Stream>(&mut self, stream: &mut Stream) -> Result<Message>
where
Stream: Read + Write,
{
loop { loop {
// Since we may get ping or close, we need to reply to the messages even during read. // Since we may get ping or close, we need to reply to the messages even during read.
// Thus we call write_pending() but ignore its blocking. // Thus we call write_pending() but ignore its blocking.
self.write_pending().no_block()?; self.write_pending(stream).no_block()?;
// If we get here, either write blocks or we have nothing to write. // If we get here, either write blocks or we have nothing to write.
// Thus if read blocks, just let it return WouldBlock. // Thus if read blocks, just let it return WouldBlock.
if let Some(message) = self.read_message_frame()? { if let Some(message) = self.read_message_frame(stream)? {
trace!("Received message {}", message); trace!("Received message {}", message);
return Ok(message) return Ok(message)
} }
} }
} }
/// Send a message to stream, if possible. /// Send a message to the provided stream, if possible.
/// ///
/// WebSocket will buffer a configurable number of messages at a time, except to reply to Ping /// WebSocket will buffer a configurable number of messages at a time, except to reply to Ping
/// and Close requests. If the WebSocket's send queue is full, `SendQueueFull` will be returned /// and Close requests. If the WebSocket's send queue is full, `SendQueueFull` will be returned
@ -161,13 +227,16 @@ impl<Stream: Read + Write> WebSocket<Stream> {
/// ///
/// Note that only the last pong frame is stored to be sent, and only the /// Note that only the last pong frame is stored to be sent, and only the
/// most recent pong frame is sent if multiple pong frames are queued. /// most recent pong frame is sent if multiple pong frames are queued.
pub fn write_message(&mut self, message: Message) -> Result<()> { pub fn write_message<Stream>(&mut self, stream: &mut Stream, message: Message) -> Result<()>
where
Stream: Read + Write,
{
if let Some(max_send_queue) = self.config.max_send_queue { if let Some(max_send_queue) = self.config.max_send_queue {
if self.send_queue.len() >= max_send_queue { if self.send_queue.len() >= max_send_queue {
// Try to make some room for the new message. // Try to make some room for the new message.
// Do not return here if write would block, ignore WouldBlock silently // Do not return here if write would block, ignore WouldBlock silently
// since we must queue the message anyway. // since we must queue the message anyway.
self.write_pending().no_block()?; self.write_pending(stream).no_block()?;
} }
if self.send_queue.len() >= max_send_queue { if self.send_queue.len() >= max_send_queue {
@ -185,31 +254,34 @@ impl<Stream: Read + Write> WebSocket<Stream> {
Message::Ping(data) => Frame::ping(data), Message::Ping(data) => Frame::ping(data),
Message::Pong(data) => { Message::Pong(data) => {
self.pong = Some(Frame::pong(data)); self.pong = Some(Frame::pong(data));
return self.write_pending() return self.write_pending(stream)
} }
Message::Close(code) => { Message::Close(code) => {
return self.close(code) return self.close(stream, code)
} }
}; };
self.send_queue.push_back(frame); self.send_queue.push_back(frame);
self.write_pending() self.write_pending(stream)
} }
/// Flush the pending send queue. /// Flush the pending send queue.
pub fn write_pending(&mut self) -> Result<()> { pub fn write_pending<Stream>(&mut self, stream: &mut Stream) -> Result<()>
where
Stream: Read + Write,
{
// First, make sure we have no pending frame sending. // First, make sure we have no pending frame sending.
self.socket.write_pending()?; self.frame.write_pending(stream)?;
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
// response, unless it already received a Close frame. It SHOULD // response, unless it already received a Close frame. It SHOULD
// respond with Pong frame as soon as is practical. (RFC 6455) // respond with Pong frame as soon as is practical. (RFC 6455)
if let Some(pong) = self.pong.take() { if let Some(pong) = self.pong.take() {
self.send_one_frame(pong)?; self.send_one_frame(stream, pong)?;
} }
// If we have any unsent frames, send them. // If we have any unsent frames, send them.
while let Some(data) = self.send_queue.pop_front() { while let Some(data) = self.send_queue.pop_front() {
self.send_one_frame(data)?; self.send_one_frame(stream, data)?;
} }
// If we get to this point, the send queue is empty and the underlying socket is still // If we get to this point, the send queue is empty and the underlying socket is still
@ -237,7 +309,10 @@ impl<Stream: Read + Write> WebSocket<Stream> {
/// This function guarantees that the close frame will be queued. /// This function guarantees that the close frame will be queued.
/// There is no need to call it again. Calling this function is /// There is no need to call it again. Calling this function is
/// the same as calling `write(Message::Close(..))`. /// the same as calling `write(Message::Close(..))`.
pub fn close(&mut self, code: Option<CloseFrame>) -> Result<()> { pub fn close<Stream>(&mut self, stream: &mut Stream, code: Option<CloseFrame>) -> Result<()>
where
Stream: Read + Write,
{
if let WebSocketState::Active = self.state { if let WebSocketState::Active = self.state {
self.state = WebSocketState::ClosedByUs; self.state = WebSocketState::ClosedByUs;
let frame = Frame::close(code); let frame = Frame::close(code);
@ -245,14 +320,17 @@ impl<Stream: Read + Write> WebSocket<Stream> {
} else { } else {
// Already closed, nothing to do. // Already closed, nothing to do.
} }
self.write_pending() self.write_pending(stream)
} }
} }
impl<Stream: Read + Write> WebSocket<Stream> { impl WebSocketContext {
/// Try to decode one message frame. May return None. /// Try to decode one message frame. May return None.
fn read_message_frame(&mut self) -> Result<Option<Message>> { fn read_message_frame<Stream>(&mut self, stream: &mut Stream) -> Result<Option<Message>>
if let Some(mut frame) = self.socket.read_frame(self.config.max_frame_size)? { where
Stream: Read + Write,
{
if let Some(mut frame) = self.frame.read_frame(stream, self.config.max_frame_size)? {
// MUST be 0 unless an extension is negotiated that defines meanings // MUST be 0 unless an extension is negotiated that defines meanings
// for non-zero values. If a nonzero value is received and none of // for non-zero values. If a nonzero value is received and none of
@ -434,7 +512,10 @@ impl<Stream: Read + Write> WebSocket<Stream> {
} }
/// Send a single pending frame. /// Send a single pending frame.
fn send_one_frame(&mut self, mut frame: Frame) -> Result<()> { fn send_one_frame<Stream>(&mut self, stream: &mut Stream, mut frame: Frame) -> Result<()>
where
Stream: Read + Write,
{
match self.role { match self.role {
Role::Server => { Role::Server => {
} }
@ -444,10 +525,11 @@ impl<Stream: Read + Write> WebSocket<Stream> {
frame.set_random_mask(); frame.set_random_mask();
} }
} }
self.socket.write_frame(frame) self.frame.write_frame(stream, frame)
} }
} }
/// The current connection state. /// The current connection state.
#[derive(Debug)] #[derive(Debug)]
enum WebSocketState { enum WebSocketState {

Loading…
Cancel
Save