Buffer writes before writing to the underlying stream

Add write_buffer_size
Set default 128 KiB
pull/358/head
Alex Butler 2 years ago
parent 2cf7cfef04
commit 2ef5b9a5e2
  1. 4
      CHANGELOG.md
  2. 34
      src/protocol/frame/mod.rs
  3. 44
      src/protocol/mod.rs

@ -2,7 +2,7 @@
- Remove many implicit flushing behaviours. In general reading and writing messages will no - Remove many implicit flushing behaviours. In general reading and writing messages will no
longer flush until calling `flush`. An exception is automatic responses (e.g. pongs) longer flush until calling `flush`. An exception is automatic responses (e.g. pongs)
which will continue to be written and flushed when reading and writing. which will continue to be written and flushed when reading and writing.
This allows writing a batch of messages and flushing once. This allows writing a batch of messages and flushing once, improving performance.
- Add `WebSocket::read`, `write`, `send`, `flush`. Deprecate `read_message`, `write_message`, `write_pending`. - Add `WebSocket::read`, `write`, `send`, `flush`. Deprecate `read_message`, `write_message`, `write_pending`.
- Add `FrameSocket::read`, `write`, `send`, `flush`. Remove `read_frame`, `write_frame`, `write_pending`. - Add `FrameSocket::read`, `write`, `send`, `flush`. Remove `read_frame`, `write_frame`, `write_pending`.
Note: Previous use of `write_frame` may be replaced with `send`. Note: Previous use of `write_frame` may be replaced with `send`.
@ -12,6 +12,8 @@
* Add `WebSocketConfig::max_write_buffer_size`. Deprecate `max_send_queue`. * Add `WebSocketConfig::max_write_buffer_size`. Deprecate `max_send_queue`.
* Add `Error::WriteBufferFull`. Remove `Error::SendQueueFull`. * Add `Error::WriteBufferFull`. Remove `Error::SendQueueFull`.
Note: `WriteBufferFull` returns the message that could not be written as a `Message::Frame`. Note: `WriteBufferFull` returns the message that could not be written as a `Message::Frame`.
- Add ability to buffer multiple writes before writing to the underlying stream, controlled by
`WebSocketConfig::write_buffer_size` (default 128 KiB). Improves batch message write performance.
# 0.19.0 # 0.19.0

@ -80,7 +80,7 @@ where
/// is returned. /// is returned.
/// In order to handle WouldBlock or Incomplete, call [`flush`](Self::flush) afterwards. /// In order to handle WouldBlock or Incomplete, call [`flush`](Self::flush) afterwards.
pub fn write(&mut self, frame: Frame) -> Result<()> { pub fn write(&mut self, frame: Frame) -> Result<()> {
self.codec.write_frame(&mut self.stream, frame) self.codec.buffer_frame(&mut self.stream, frame)
} }
/// Flush writes. /// Flush writes.
@ -99,6 +99,12 @@ pub(super) struct FrameCodec {
out_buffer: Vec<u8>, out_buffer: Vec<u8>,
/// Capacity limit for `out_buffer`. /// Capacity limit for `out_buffer`.
max_out_buffer_len: usize, max_out_buffer_len: usize,
/// Buffer target length to reach before writing to the stream
/// on calls to `buffer_frame`.
///
/// Setting this to non-zero will buffer small writes from hitting
/// the stream.
out_buffer_write_len: usize,
/// Header and remaining size of the incoming packet being processed. /// Header and remaining size of the incoming packet being processed.
header: Option<(FrameHeader, u64)>, header: Option<(FrameHeader, u64)>,
} }
@ -110,6 +116,7 @@ impl FrameCodec {
in_buffer: ReadBuffer::new(), in_buffer: ReadBuffer::new(),
out_buffer: Vec::new(), out_buffer: Vec::new(),
max_out_buffer_len: usize::MAX, max_out_buffer_len: usize::MAX,
out_buffer_write_len: 0,
header: None, header: None,
} }
} }
@ -120,19 +127,20 @@ impl FrameCodec {
in_buffer: ReadBuffer::from_partially_read(part), in_buffer: ReadBuffer::from_partially_read(part),
out_buffer: Vec::new(), out_buffer: Vec::new(),
max_out_buffer_len: usize::MAX, max_out_buffer_len: usize::MAX,
out_buffer_write_len: 0,
header: None, header: None,
} }
} }
/// Sets a maximum size for the out buffer. /// Sets a maximum size for the out buffer.
pub(super) fn with_max_out_buffer_len(mut self, max: usize) -> Self { pub(super) fn set_max_out_buffer_len(&mut self, max: usize) {
self.max_out_buffer_len = max; self.max_out_buffer_len = max;
self
} }
/// Sets a maximum size for the out buffer. /// Sets [`Self::buffer_frame`] buffer target length to reach before
pub(super) fn set_max_out_buffer_len(&mut self, max: usize) { /// writing to the stream.
self.max_out_buffer_len = max; pub(super) fn set_target_buffer_write_len(&mut self, len: usize) {
self.out_buffer_write_len = len;
} }
/// Read a frame from the provided stream. /// Read a frame from the provided stream.
@ -193,10 +201,12 @@ impl FrameCodec {
Ok(Some(frame)) Ok(Some(frame))
} }
/// Write a frame to the provided stream. /// Writes a frame into the `out_buffer`.
/// If the out buffer size is over the `out_buffer_write_len` will also write
/// the out buffer into the provided `stream`.
/// ///
/// Does **not** flush. /// Does **not** flush.
pub(super) fn write_frame<Stream>(&mut self, stream: &mut Stream, frame: Frame) -> Result<()> pub(super) fn buffer_frame<Stream>(&mut self, stream: &mut Stream, frame: Frame) -> Result<()>
where where
Stream: Write, Stream: Write,
{ {
@ -209,10 +219,14 @@ impl FrameCodec {
self.out_buffer.reserve(frame.len()); self.out_buffer.reserve(frame.len());
frame.format(&mut self.out_buffer).expect("Bug: can't write to vector"); frame.format(&mut self.out_buffer).expect("Bug: can't write to vector");
self.write_out_buffer(stream) if self.out_buffer.len() > self.out_buffer_write_len {
self.write_out_buffer(stream)
} else {
Ok(())
}
} }
/// Write any buffered frames to the provided stream. /// Writes the out_buffer to the provided stream.
/// ///
/// Does **not** flush. /// Does **not** flush.
pub(super) fn write_out_buffer<Stream>(&mut self, stream: &mut Stream) -> Result<()> pub(super) fn write_out_buffer<Stream>(&mut self, stream: &mut Stream) -> Result<()>

@ -38,8 +38,17 @@ pub struct WebSocketConfig {
/// Does nothing, instead use `max_write_buffer_size`. /// Does nothing, instead use `max_write_buffer_size`.
#[deprecated] #[deprecated]
pub max_send_queue: Option<usize>, pub max_send_queue: Option<usize>,
/// The max size of the write buffer in bytes. Setting this can provide backpressure. /// The target minimum size of the write buffer to reach before writing the data
/// to the underlying stream.
/// The default value is 128 KiB.
///
/// Note: [`flush`](WebSocket::flush) will always be fully write the buffer regardless.
pub write_buffer_size: usize,
/// The max size of the write buffer in bytes. Setting this can provide backpressure
/// in the case the write buffer is filling up due to write errors.
/// The default value is unlimited. /// The default value is unlimited.
///
/// Note: Should always be set higher than [`write_buffer_size`](Self::write_buffer_size).
pub max_write_buffer_size: usize, pub max_write_buffer_size: usize,
/// The maximum size of a message. `None` means no size limit. The default value is 64 MiB /// The maximum size of a message. `None` means no size limit. The default value is 64 MiB
/// which should be reasonably big for all normal use-cases but small enough to prevent /// which should be reasonably big for all normal use-cases but small enough to prevent
@ -63,6 +72,7 @@ impl Default for WebSocketConfig {
#[allow(deprecated)] #[allow(deprecated)]
WebSocketConfig { WebSocketConfig {
max_send_queue: None, max_send_queue: None,
write_buffer_size: 128 * 1024,
max_write_buffer_size: usize::MAX, max_write_buffer_size: usize::MAX,
max_message_size: Some(64 << 20), max_message_size: Some(64 << 20),
max_frame_size: Some(16 << 20), max_frame_size: Some(16 << 20),
@ -283,10 +293,25 @@ impl WebSocketContext {
/// Create a WebSocket context that manages a post-handshake stream. /// Create a WebSocket context that manages a post-handshake stream.
pub fn new(role: Role, config: Option<WebSocketConfig>) -> Self { pub fn new(role: Role, config: Option<WebSocketConfig>) -> Self {
let config = config.unwrap_or_default(); let config = config.unwrap_or_default();
let mut frame = FrameCodec::new();
frame.set_max_out_buffer_len(config.max_write_buffer_size);
frame.set_target_buffer_write_len(config.write_buffer_size);
Self::_new(role, frame, config)
}
/// Create a WebSocket context that manages an post-handshake stream.
pub fn from_partially_read(part: Vec<u8>, role: Role, config: Option<WebSocketConfig>) -> Self {
let config = config.unwrap_or_default();
let mut frame = FrameCodec::from_partially_read(part);
frame.set_max_out_buffer_len(config.max_write_buffer_size);
frame.set_target_buffer_write_len(config.write_buffer_size);
Self::_new(role, frame, config)
}
WebSocketContext { fn _new(role: Role, frame: FrameCodec, config: WebSocketConfig) -> Self {
Self {
role, role,
frame: FrameCodec::new().with_max_out_buffer_len(config.max_write_buffer_size), frame,
state: WebSocketState::Active, state: WebSocketState::Active,
incomplete: None, incomplete: None,
additional_send: None, additional_send: None,
@ -294,20 +319,11 @@ impl WebSocketContext {
} }
} }
/// Create a WebSocket context that manages an post-handshake stream.
pub fn from_partially_read(part: Vec<u8>, role: Role, config: Option<WebSocketConfig>) -> Self {
let config = config.unwrap_or_default();
WebSocketContext {
frame: FrameCodec::from_partially_read(part)
.with_max_out_buffer_len(config.max_write_buffer_size),
..WebSocketContext::new(role, Some(config))
}
}
/// Change the configuration. /// Change the configuration.
pub fn set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig)) { pub fn set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig)) {
set_func(&mut self.config); set_func(&mut self.config);
self.frame.set_max_out_buffer_len(self.config.max_write_buffer_size); self.frame.set_max_out_buffer_len(self.config.max_write_buffer_size);
self.frame.set_target_buffer_write_len(self.config.write_buffer_size);
} }
/// Read the configuration. /// Read the configuration.
@ -665,7 +681,7 @@ impl WebSocketContext {
} }
trace!("Sending frame: {:?}", frame); trace!("Sending frame: {:?}", frame);
self.frame.write_frame(stream, frame).check_connection_reset(self.state) self.frame.buffer_frame(stream, frame).check_connection_reset(self.state)
} }
/// Replace `additional_send` if it is currently a `Pong` message. /// Replace `additional_send` if it is currently a `Pong` message.

Loading…
Cancel
Save