From 2ef5b9a5e2a66ebbd7b573ad7b33a85aa7cbafb6 Mon Sep 17 00:00:00 2001 From: Alex Butler Date: Tue, 30 May 2023 10:46:57 +0100 Subject: [PATCH] Buffer writes before writing to the underlying stream Add write_buffer_size Set default 128 KiB --- CHANGELOG.md | 4 +++- src/protocol/frame/mod.rs | 34 +++++++++++++++++++++--------- src/protocol/mod.rs | 44 ++++++++++++++++++++++++++------------- 3 files changed, 57 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5a60687..36a1893 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ - Remove many implicit flushing behaviours. In general reading and writing messages will no longer flush until calling `flush`. An exception is automatic responses (e.g. pongs) which will continue to be written and flushed when reading and writing. - This allows writing a batch of messages and flushing once. + This allows writing a batch of messages and flushing once, improving performance. - Add `WebSocket::read`, `write`, `send`, `flush`. Deprecate `read_message`, `write_message`, `write_pending`. - Add `FrameSocket::read`, `write`, `send`, `flush`. Remove `read_frame`, `write_frame`, `write_pending`. Note: Previous use of `write_frame` may be replaced with `send`. @@ -12,6 +12,8 @@ * Add `WebSocketConfig::max_write_buffer_size`. Deprecate `max_send_queue`. * Add `Error::WriteBufferFull`. Remove `Error::SendQueueFull`. Note: `WriteBufferFull` returns the message that could not be written as a `Message::Frame`. +- Add ability to buffer multiple writes before writing to the underlying stream, controlled by + `WebSocketConfig::write_buffer_size` (default 128 KiB). Improves batch message write performance. # 0.19.0 diff --git a/src/protocol/frame/mod.rs b/src/protocol/frame/mod.rs index bb72e5a..b02c1e8 100644 --- a/src/protocol/frame/mod.rs +++ b/src/protocol/frame/mod.rs @@ -80,7 +80,7 @@ where /// is returned. /// In order to handle WouldBlock or Incomplete, call [`flush`](Self::flush) afterwards. pub fn write(&mut self, frame: Frame) -> Result<()> { - self.codec.write_frame(&mut self.stream, frame) + self.codec.buffer_frame(&mut self.stream, frame) } /// Flush writes. @@ -99,6 +99,12 @@ pub(super) struct FrameCodec { out_buffer: Vec, /// Capacity limit for `out_buffer`. max_out_buffer_len: usize, + /// Buffer target length to reach before writing to the stream + /// on calls to `buffer_frame`. + /// + /// Setting this to non-zero will buffer small writes from hitting + /// the stream. + out_buffer_write_len: usize, /// Header and remaining size of the incoming packet being processed. header: Option<(FrameHeader, u64)>, } @@ -110,6 +116,7 @@ impl FrameCodec { in_buffer: ReadBuffer::new(), out_buffer: Vec::new(), max_out_buffer_len: usize::MAX, + out_buffer_write_len: 0, header: None, } } @@ -120,19 +127,20 @@ impl FrameCodec { in_buffer: ReadBuffer::from_partially_read(part), out_buffer: Vec::new(), max_out_buffer_len: usize::MAX, + out_buffer_write_len: 0, header: None, } } /// Sets a maximum size for the out buffer. - pub(super) fn with_max_out_buffer_len(mut self, max: usize) -> Self { + pub(super) fn set_max_out_buffer_len(&mut self, max: usize) { self.max_out_buffer_len = max; - self } - /// Sets a maximum size for the out buffer. - pub(super) fn set_max_out_buffer_len(&mut self, max: usize) { - self.max_out_buffer_len = max; + /// Sets [`Self::buffer_frame`] buffer target length to reach before + /// writing to the stream. + pub(super) fn set_target_buffer_write_len(&mut self, len: usize) { + self.out_buffer_write_len = len; } /// Read a frame from the provided stream. @@ -193,10 +201,12 @@ impl FrameCodec { Ok(Some(frame)) } - /// Write a frame to the provided stream. + /// Writes a frame into the `out_buffer`. + /// If the out buffer size is over the `out_buffer_write_len` will also write + /// the out buffer into the provided `stream`. /// /// Does **not** flush. - pub(super) fn write_frame(&mut self, stream: &mut Stream, frame: Frame) -> Result<()> + pub(super) fn buffer_frame(&mut self, stream: &mut Stream, frame: Frame) -> Result<()> where Stream: Write, { @@ -209,10 +219,14 @@ impl FrameCodec { self.out_buffer.reserve(frame.len()); frame.format(&mut self.out_buffer).expect("Bug: can't write to vector"); - self.write_out_buffer(stream) + if self.out_buffer.len() > self.out_buffer_write_len { + self.write_out_buffer(stream) + } else { + Ok(()) + } } - /// Write any buffered frames to the provided stream. + /// Writes the out_buffer to the provided stream. /// /// Does **not** flush. pub(super) fn write_out_buffer(&mut self, stream: &mut Stream) -> Result<()> diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 1100a67..d668837 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -38,8 +38,17 @@ pub struct WebSocketConfig { /// Does nothing, instead use `max_write_buffer_size`. #[deprecated] pub max_send_queue: Option, - /// The max size of the write buffer in bytes. Setting this can provide backpressure. + /// The target minimum size of the write buffer to reach before writing the data + /// to the underlying stream. + /// The default value is 128 KiB. + /// + /// Note: [`flush`](WebSocket::flush) will always be fully write the buffer regardless. + pub write_buffer_size: usize, + /// The max size of the write buffer in bytes. Setting this can provide backpressure + /// in the case the write buffer is filling up due to write errors. /// The default value is unlimited. + /// + /// Note: Should always be set higher than [`write_buffer_size`](Self::write_buffer_size). pub max_write_buffer_size: usize, /// The maximum size of a message. `None` means no size limit. The default value is 64 MiB /// which should be reasonably big for all normal use-cases but small enough to prevent @@ -63,6 +72,7 @@ impl Default for WebSocketConfig { #[allow(deprecated)] WebSocketConfig { max_send_queue: None, + write_buffer_size: 128 * 1024, max_write_buffer_size: usize::MAX, max_message_size: Some(64 << 20), max_frame_size: Some(16 << 20), @@ -283,10 +293,25 @@ impl WebSocketContext { /// Create a WebSocket context that manages a post-handshake stream. pub fn new(role: Role, config: Option) -> Self { let config = config.unwrap_or_default(); + let mut frame = FrameCodec::new(); + frame.set_max_out_buffer_len(config.max_write_buffer_size); + frame.set_target_buffer_write_len(config.write_buffer_size); + Self::_new(role, frame, config) + } + + /// Create a WebSocket context that manages an post-handshake stream. + pub fn from_partially_read(part: Vec, role: Role, config: Option) -> Self { + let config = config.unwrap_or_default(); + let mut frame = FrameCodec::from_partially_read(part); + frame.set_max_out_buffer_len(config.max_write_buffer_size); + frame.set_target_buffer_write_len(config.write_buffer_size); + Self::_new(role, frame, config) + } - WebSocketContext { + fn _new(role: Role, frame: FrameCodec, config: WebSocketConfig) -> Self { + Self { role, - frame: FrameCodec::new().with_max_out_buffer_len(config.max_write_buffer_size), + frame, state: WebSocketState::Active, incomplete: None, additional_send: None, @@ -294,20 +319,11 @@ impl WebSocketContext { } } - /// Create a WebSocket context that manages an post-handshake stream. - pub fn from_partially_read(part: Vec, role: Role, config: Option) -> Self { - let config = config.unwrap_or_default(); - WebSocketContext { - frame: FrameCodec::from_partially_read(part) - .with_max_out_buffer_len(config.max_write_buffer_size), - ..WebSocketContext::new(role, Some(config)) - } - } - /// Change the configuration. pub fn set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig)) { set_func(&mut self.config); self.frame.set_max_out_buffer_len(self.config.max_write_buffer_size); + self.frame.set_target_buffer_write_len(self.config.write_buffer_size); } /// Read the configuration. @@ -665,7 +681,7 @@ impl WebSocketContext { } trace!("Sending frame: {:?}", frame); - self.frame.write_frame(stream, frame).check_connection_reset(self.state) + self.frame.buffer_frame(stream, frame).check_connection_reset(self.state) } /// Replace `additional_send` if it is currently a `Pong` message.