Adds support for outbound message chunking

pull/144/head
SirCipher 5 years ago
parent 31b50bf6b5
commit 3cf0b83949
  1. 73
      src/protocol/mod.rs

@ -19,6 +19,7 @@ use crate::error::{Error, Result};
use crate::extensions::uncompressed::UncompressedExt;
use crate::extensions::WebSocketExtension;
use crate::util::NonBlockingResult;
use crate::protocol::frame::coding::Data;
pub(crate) const MAX_MESSAGE_SIZE: usize = 64 << 20;
@ -34,7 +35,7 @@ pub enum Role {
/// The configuration for WebSocket connection.
#[derive(Debug, Copy, Clone)]
pub struct WebSocketConfig<E = UncompressedExt>
where
where
E: WebSocketExtension,
{
/// The size of the send queue. You can use it to turn on/off the backpressure features. `None`
@ -51,7 +52,7 @@ where
}
impl<E> Default for WebSocketConfig<E>
where
where
E: WebSocketExtension,
{
fn default() -> Self {
@ -64,7 +65,7 @@ where
}
impl<E> WebSocketConfig<E>
where
where
E: WebSocketExtension,
{
/// Creates a `WebSocketConfig` instance using the default configuration and the provided
@ -84,7 +85,7 @@ where
/// It may be created by calling `connect`, `accept` or `client` functions.
#[derive(Debug)]
pub struct WebSocket<Stream, Ext>
where
where
Ext: WebSocketExtension,
{
/// The underlying socket.
@ -94,7 +95,7 @@ where
}
impl<Stream, Ext> WebSocket<Stream, Ext>
where
where
Ext: WebSocketExtension,
{
/// Convert a raw socket into a WebSocket without performing a handshake.
@ -167,7 +168,7 @@ where
}
impl<Stream, Ext> WebSocket<Stream, Ext>
where
where
Stream: Read + Write,
Ext: WebSocketExtension,
{
@ -254,7 +255,7 @@ where
/// A context for managing WebSocket stream.
#[derive(Debug)]
pub struct WebSocketContext<Ext = UncompressedExt>
where
where
Ext: WebSocketExtension,
{
/// Server or client?
@ -274,7 +275,7 @@ where
}
impl<Ext> WebSocketContext<Ext>
where
where
Ext: WebSocketExtension,
{
/// Create a WebSocket context that manages a post-handshake stream.
@ -608,11 +609,28 @@ where
};
}
let max_frame_size = self.config.max_frame_size.unwrap_or_else(usize::max_value);
if frame.payload().len() > max_frame_size {
let mut chunks = frame.payload().chunks(self.config.max_frame_size.unwrap_or_else(usize::max_value)).peekable();
let data_frame = Frame::message(Vec::from(chunks.next().unwrap()), frame.header().opcode, false);
self.frame.write_frame(stream, data_frame).check_connection_reset(self.state)?;
while let Some(chunk) = chunks.next() {
let frame = Frame::message(Vec::from(chunk), OpCode::Data(Data::Continue), chunks.peek().is_none());
trace!("Sending frame: {:?}", frame);
self.frame.write_frame(stream, frame).check_connection_reset(self.state)?;
}
Ok(())
} else {
trace!("Sending frame: {:?}", frame);
self.frame
.write_frame(stream, frame)
.check_connection_reset(self.state)
}
}
}
/// The current connection state.
@ -685,6 +703,8 @@ mod tests {
use crate::extensions::uncompressed::UncompressedExt;
use std::io;
use std::io::Cursor;
use crate::protocol::frame::Frame;
use crate::protocol::frame::coding::{OpCode, Data};
struct WriteMoc<Stream>(Stream);
@ -756,4 +776,41 @@ mod tests {
"Space limit exceeded: Message too big: 0 + 3 > 2"
);
}
#[test]
fn fragmented_tx() {
let max_message_size = 2;
let input_str = "hello unit test";
let limit = WebSocketConfig {
max_send_queue: None,
max_frame_size: Some(2),
encoder: UncompressedExt::new(Some(max_message_size)),
};
let mut socket = WebSocket::from_raw_socket(Cursor::new(Vec::new()), Role::Client, Some(limit));
socket.write_message(Message::text(input_str)).unwrap();
socket.socket.set_position(0);
let WebSocket { mut socket, mut context } = socket;
let vec = input_str.chars().collect::<Vec<_>>();
let mut iter = vec.chunks(max_message_size).map(|c| c.iter().collect::<String>())
.into_iter().peekable();
let frame_eq = |expected: Frame, actual: Frame| {
assert_eq!(expected.payload(), actual.payload());
assert_eq!(expected.header().opcode, actual.header().opcode);
assert_eq!(expected.header().rsv1, actual.header().rsv1);
};
let expected = Frame::message(iter.next().unwrap().into(), OpCode::Data(Data::Text), false);
frame_eq(expected, context.frame.read_frame(&mut socket, Some(max_message_size)).unwrap().unwrap());
while let Some(chars) = iter.next() {
let expected = Frame::message(chars.into(), OpCode::Data(Data::Continue), iter.peek().is_none());
frame_eq(expected, context.frame.read_frame(&mut socket, Some(max_message_size)).unwrap().unwrap());
}
}
}

Loading…
Cancel
Save