Add drain method for WebSocket{,Context}

This allows the user to retrieve the content of the message
queue after the socket connection died.
pull/95/head
jean-airoldie 5 years ago
parent 345d262972
commit ed0010126a
  1. 240
      src/protocol/mod.rs

@ -110,6 +110,12 @@ impl<Stream> WebSocket<Stream> {
pub fn set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig)) { pub fn set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig)) {
self.context.set_config(set_func) self.context.set_config(set_func)
} }
/// Consume the websocket, returning the content of its message queue,
/// from most to least recently queued.
pub fn drain(self) -> Vec<Message> {
self.context.drain()
}
} }
impl<Stream: Read + Write> WebSocket<Stream> { impl<Stream: Read + Write> WebSocket<Stream> {
@ -370,6 +376,22 @@ impl WebSocketContext {
} }
self.write_pending(stream) self.write_pending(stream)
} }
/// Consume the websocket context, returning its queued messages,
/// from most to least recently queued.
pub fn drain(mut self) -> Vec<Message> {
let mut messages = Vec::with_capacity(self.send_queue.len());
let send_queue = replace(&mut self.send_queue, VecDeque::new());
for frame in send_queue.into_iter().rev() {
// This should not error since we created those frames.
if let Some(message) = self._read_message_frame(frame).unwrap() {
messages.push(message);
}
}
messages
}
} }
impl WebSocketContext { impl WebSocketContext {
@ -378,130 +400,134 @@ impl WebSocketContext {
where where
Stream: Read + Write, Stream: Read + Write,
{ {
if let Some(mut frame) = self.frame.read_frame(stream, self.config.max_frame_size)? { if let Some(frame) = self.frame.read_frame(stream, self.config.max_frame_size)? {
if !self.state.can_read() { self._read_message_frame(frame)
return Err(Error::Protocol( } else {
"Remote sent frame after having sent a Close Frame".into(), // Connection closed by peer
)); match replace(&mut self.state, WebSocketState::Terminated) {
} WebSocketState::ClosedByPeer | WebSocketState::CloseAcknowledged => {
// MUST be 0 unless an extension is negotiated that defines meanings Err(Error::ConnectionClosed)
// for non-zero values. If a nonzero value is received and none of
// the negotiated extensions defines the meaning of such a nonzero
// value, the receiving endpoint MUST _Fail the WebSocket
// Connection_.
{
let hdr = frame.header();
if hdr.rsv1 || hdr.rsv2 || hdr.rsv3 {
return Err(Error::Protocol("Reserved bits are non-zero".into()));
} }
_ => Err(Error::Protocol(
"Connection reset without closing handshake".into(),
)),
} }
}
}
match self.role { fn _read_message_frame(&mut self, mut frame: Frame) -> Result<Option<Message>> {
Role::Server => { if !self.state.can_read() {
if frame.is_masked() { return Err(Error::Protocol(
// A server MUST remove masking for data frames received from a client "Remote sent frame after having sent a Close Frame".into(),
// as described in Section 5.3. (RFC 6455) ));
frame.apply_mask() }
} else { // MUST be 0 unless an extension is negotiated that defines meanings
// The server MUST close the connection upon receiving a // for non-zero values. If a nonzero value is received and none of
// frame that is not masked. (RFC 6455) // the negotiated extensions defines the meaning of such a nonzero
return Err(Error::Protocol( // value, the receiving endpoint MUST _Fail the WebSocket
"Received an unmasked frame from client".into(), // Connection_.
)); {
} let hdr = frame.header();
if hdr.rsv1 || hdr.rsv2 || hdr.rsv3 {
return Err(Error::Protocol("Reserved bits are non-zero".into()));
}
}
match self.role {
Role::Server => {
if frame.is_masked() {
// A server MUST remove masking for data frames received from a client
// as described in Section 5.3. (RFC 6455)
frame.apply_mask()
} else {
// The server MUST close the connection upon receiving a
// frame that is not masked. (RFC 6455)
return Err(Error::Protocol(
"Received an unmasked frame from client".into(),
));
} }
Role::Client => { }
if frame.is_masked() { Role::Client => {
// A client MUST close a connection if it detects a masked frame. (RFC 6455) if frame.is_masked() {
return Err(Error::Protocol( // A client MUST close a connection if it detects a masked frame. (RFC 6455)
"Received a masked frame from server".into(), return Err(Error::Protocol(
)); "Received a masked frame from server".into(),
} ));
} }
} }
}
match frame.header().opcode { match frame.header().opcode {
OpCode::Control(ctl) => { OpCode::Control(ctl) => {
match ctl { match ctl {
// All control frames MUST have a payload length of 125 bytes or less // All control frames MUST have a payload length of 125 bytes or less
// and MUST NOT be fragmented. (RFC 6455) // and MUST NOT be fragmented. (RFC 6455)
_ if !frame.header().is_final => { _ if !frame.header().is_final => {
Err(Error::Protocol("Fragmented control frame".into())) Err(Error::Protocol("Fragmented control frame".into()))
} }
_ if frame.payload().len() > 125 => { _ if frame.payload().len() > 125 => {
Err(Error::Protocol("Control frame too big".into())) Err(Error::Protocol("Control frame too big".into()))
} }
OpCtl::Close => Ok(self.do_close(frame.into_close()?).map(Message::Close)), OpCtl::Close => Ok(self.do_close(frame.into_close()?).map(Message::Close)),
OpCtl::Reserved(i) => Err(Error::Protocol( OpCtl::Reserved(i) => Err(Error::Protocol(
format!("Unknown control frame type {}", i).into(), format!("Unknown control frame type {}", i).into(),
)), )),
OpCtl::Ping => { OpCtl::Ping => {
let data = frame.into_data(); let data = frame.into_data();
// No ping processing after we sent a close frame. // No ping processing after we sent a close frame.
if self.state.is_active() { if self.state.is_active() {
self.pong = Some(Frame::pong(data.clone())); self.pong = Some(Frame::pong(data.clone()));
}
Ok(Some(Message::Ping(data)))
} }
OpCtl::Pong => Ok(Some(Message::Pong(frame.into_data()))), Ok(Some(Message::Ping(data)))
} }
OpCtl::Pong => Ok(Some(Message::Pong(frame.into_data()))),
} }
}
OpCode::Data(data) => { OpCode::Data(data) => {
let fin = frame.header().is_final; let fin = frame.header().is_final;
match data { match data {
OpData::Continue => { OpData::Continue => {
if let Some(ref mut msg) = self.incomplete { if let Some(ref mut msg) = self.incomplete {
msg.extend(frame.into_data(), self.config.max_message_size)?; msg.extend(frame.into_data(), self.config.max_message_size)?;
} else { } else {
return Err(Error::Protocol( return Err(Error::Protocol(
"Continue frame but nothing to continue".into(), "Continue frame but nothing to continue".into(),
)); ));
}
if fin {
Ok(Some(self.incomplete.take().unwrap().complete()?))
} else {
Ok(None)
}
} }
c if self.incomplete.is_some() => Err(Error::Protocol( if fin {
format!("Received {} while waiting for more fragments", c).into(), Ok(Some(self.incomplete.take().unwrap().complete()?))
)), } else {
OpData::Text | OpData::Binary => { Ok(None)
let msg = { }
let message_type = match data { }
OpData::Text => IncompleteMessageType::Text, c if self.incomplete.is_some() => Err(Error::Protocol(
OpData::Binary => IncompleteMessageType::Binary, format!("Received {} while waiting for more fragments", c).into(),
_ => panic!("Bug: message is not text nor binary"), )),
}; OpData::Text | OpData::Binary => {
let mut m = IncompleteMessage::new(message_type); let msg = {
m.extend(frame.into_data(), self.config.max_message_size)?; let message_type = match data {
m OpData::Text => IncompleteMessageType::Text,
OpData::Binary => IncompleteMessageType::Binary,
_ => panic!("Bug: message is not text nor binary"),
}; };
if fin { let mut m = IncompleteMessage::new(message_type);
Ok(Some(msg.complete()?)) m.extend(frame.into_data(), self.config.max_message_size)?;
} else { m
self.incomplete = Some(msg); };
Ok(None) if fin {
} Ok(Some(msg.complete()?))
} else {
self.incomplete = Some(msg);
Ok(None)
} }
OpData::Reserved(i) => Err(Error::Protocol(
format!("Unknown data frame type {}", i).into(),
)),
} }
OpData::Reserved(i) => Err(Error::Protocol(
format!("Unknown data frame type {}", i).into(),
)),
} }
} // match opcode
} else {
// Connection closed by peer
match replace(&mut self.state, WebSocketState::Terminated) {
WebSocketState::ClosedByPeer | WebSocketState::CloseAcknowledged => {
Err(Error::ConnectionClosed)
}
_ => Err(Error::Protocol(
"Connection reset without closing handshake".into(),
)),
} }
} } // match opcode
} }
/// Received a close frame. Tells if we need to return a close frame to the user. /// Received a close frame. Tells if we need to return a close frame to the user.

Loading…
Cancel
Save