diff --git a/CHANGELOG.md b/CHANGELOG.md index 5a60687..36a1893 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ - Remove many implicit flushing behaviours. In general reading and writing messages will no longer flush until calling `flush`. An exception is automatic responses (e.g. pongs) which will continue to be written and flushed when reading and writing. - This allows writing a batch of messages and flushing once. + This allows writing a batch of messages and flushing once, improving performance. - Add `WebSocket::read`, `write`, `send`, `flush`. Deprecate `read_message`, `write_message`, `write_pending`. - Add `FrameSocket::read`, `write`, `send`, `flush`. Remove `read_frame`, `write_frame`, `write_pending`. Note: Previous use of `write_frame` may be replaced with `send`. @@ -12,6 +12,8 @@ * Add `WebSocketConfig::max_write_buffer_size`. Deprecate `max_send_queue`. * Add `Error::WriteBufferFull`. Remove `Error::SendQueueFull`. Note: `WriteBufferFull` returns the message that could not be written as a `Message::Frame`. +- Add ability to buffer multiple writes before writing to the underlying stream, controlled by + `WebSocketConfig::write_buffer_size` (default 128 KiB). Improves batch message write performance. # 0.19.0 diff --git a/benches/write.rs b/benches/write.rs index 8a19874..7908818 100644 --- a/benches/write.rs +++ b/benches/write.rs @@ -9,43 +9,51 @@ use tungstenite::{Message, WebSocket}; const MOCK_WRITE_LEN: usize = 8 * 1024 * 1024; -/// `Write` impl that simulates fast writes and slow flushes. +/// `Write` impl that simulates slowish writes and slow flushes. /// -/// Buffers up to 8 MiB fast on `write`. Each `flush` takes ~100ns. -struct MockSlowFlushWrite(Vec); +/// Each `write` can buffer up to 8 MiB before flushing but takes an additional **~80ns** +/// to simulate stuff going on in the underlying stream. +/// Each `flush` takes **~8µs** to simulate flush io. +struct MockWrite(Vec); -impl Read for MockSlowFlushWrite { +impl Read for MockWrite { fn read(&mut self, _: &mut [u8]) -> io::Result { Err(io::Error::new(io::ErrorKind::WouldBlock, "reads not supported")) } } -impl Write for MockSlowFlushWrite { +impl Write for MockWrite { fn write(&mut self, buf: &[u8]) -> io::Result { if self.0.len() + buf.len() > MOCK_WRITE_LEN { self.flush()?; } + // simulate io + spin(Duration::from_nanos(80)); self.0.extend(buf); Ok(buf.len()) } fn flush(&mut self) -> io::Result<()> { if !self.0.is_empty() { - // simulate 100ns io - let a = Instant::now(); - while a.elapsed() < Duration::from_nanos(100) { - hint::spin_loop(); - } + // simulate io + spin(Duration::from_micros(8)); self.0.clear(); } Ok(()) } } +fn spin(duration: Duration) { + let a = Instant::now(); + while a.elapsed() < duration { + hint::spin_loop(); + } +} + fn benchmark(c: &mut Criterion) { // Writes 100k small json text messages then flushes c.bench_function("write 100k small texts then flush", |b| { let mut ws = WebSocket::from_raw_socket( - MockSlowFlushWrite(Vec::with_capacity(MOCK_WRITE_LEN)), + MockWrite(Vec::with_capacity(MOCK_WRITE_LEN)), tungstenite::protocol::Role::Server, None, ); diff --git a/src/protocol/frame/mod.rs b/src/protocol/frame/mod.rs index bb72e5a..7d2ee41 100644 --- a/src/protocol/frame/mod.rs +++ b/src/protocol/frame/mod.rs @@ -80,7 +80,7 @@ where /// is returned. /// In order to handle WouldBlock or Incomplete, call [`flush`](Self::flush) afterwards. pub fn write(&mut self, frame: Frame) -> Result<()> { - self.codec.write_frame(&mut self.stream, frame) + self.codec.buffer_frame(&mut self.stream, frame) } /// Flush writes. @@ -99,6 +99,12 @@ pub(super) struct FrameCodec { out_buffer: Vec, /// Capacity limit for `out_buffer`. max_out_buffer_len: usize, + /// Buffer target length to reach before writing to the stream + /// on calls to `buffer_frame`. + /// + /// Setting this to non-zero will buffer small writes from hitting + /// the stream. + out_buffer_write_len: usize, /// Header and remaining size of the incoming packet being processed. header: Option<(FrameHeader, u64)>, } @@ -110,6 +116,7 @@ impl FrameCodec { in_buffer: ReadBuffer::new(), out_buffer: Vec::new(), max_out_buffer_len: usize::MAX, + out_buffer_write_len: 0, header: None, } } @@ -120,19 +127,20 @@ impl FrameCodec { in_buffer: ReadBuffer::from_partially_read(part), out_buffer: Vec::new(), max_out_buffer_len: usize::MAX, + out_buffer_write_len: 0, header: None, } } /// Sets a maximum size for the out buffer. - pub(super) fn with_max_out_buffer_len(mut self, max: usize) -> Self { + pub(super) fn set_max_out_buffer_len(&mut self, max: usize) { self.max_out_buffer_len = max; - self } - /// Sets a maximum size for the out buffer. - pub(super) fn set_max_out_buffer_len(&mut self, max: usize) { - self.max_out_buffer_len = max; + /// Sets [`Self::buffer_frame`] buffer target length to reach before + /// writing to the stream. + pub(super) fn set_out_buffer_write_len(&mut self, len: usize) { + self.out_buffer_write_len = len; } /// Read a frame from the provided stream. @@ -193,10 +201,14 @@ impl FrameCodec { Ok(Some(frame)) } - /// Write a frame to the provided stream. + /// Writes a frame into the `out_buffer`. + /// If the out buffer size is over the `out_buffer_write_len` will also write + /// the out buffer into the provided `stream`. /// - /// Does **not** flush. - pub(super) fn write_frame(&mut self, stream: &mut Stream, frame: Frame) -> Result<()> + /// To ensure buffered frames are written call [`Self::write_out_buffer`]. + /// + /// May write to the stream, will **not** flush. + pub(super) fn buffer_frame(&mut self, stream: &mut Stream, frame: Frame) -> Result<()> where Stream: Write, { @@ -209,10 +221,14 @@ impl FrameCodec { self.out_buffer.reserve(frame.len()); frame.format(&mut self.out_buffer).expect("Bug: can't write to vector"); - self.write_out_buffer(stream) + if self.out_buffer.len() > self.out_buffer_write_len { + self.write_out_buffer(stream) + } else { + Ok(()) + } } - /// Write any buffered frames to the provided stream. + /// Writes the out_buffer to the provided stream. /// /// Does **not** flush. pub(super) fn write_out_buffer(&mut self, stream: &mut Stream) -> Result<()> diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 1100a67..db32586 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -38,8 +38,17 @@ pub struct WebSocketConfig { /// Does nothing, instead use `max_write_buffer_size`. #[deprecated] pub max_send_queue: Option, - /// The max size of the write buffer in bytes. Setting this can provide backpressure. + /// The target minimum size of the write buffer to reach before writing the data + /// to the underlying stream. + /// The default value is 128 KiB. + /// + /// Note: [`flush`](WebSocket::flush) will always fully write the buffer regardless. + pub write_buffer_size: usize, + /// The max size of the write buffer in bytes. Setting this can provide backpressure + /// in the case the write buffer is filling up due to write errors. /// The default value is unlimited. + /// + /// Note: Should always be set higher than [`write_buffer_size`](Self::write_buffer_size). pub max_write_buffer_size: usize, /// The maximum size of a message. `None` means no size limit. The default value is 64 MiB /// which should be reasonably big for all normal use-cases but small enough to prevent @@ -63,6 +72,7 @@ impl Default for WebSocketConfig { #[allow(deprecated)] WebSocketConfig { max_send_queue: None, + write_buffer_size: 128 * 1024, max_write_buffer_size: usize::MAX, max_message_size: Some(64 << 20), max_frame_size: Some(16 << 20), @@ -282,11 +292,20 @@ pub struct WebSocketContext { impl WebSocketContext { /// Create a WebSocket context that manages a post-handshake stream. pub fn new(role: Role, config: Option) -> Self { - let config = config.unwrap_or_default(); + Self::_new(role, FrameCodec::new(), config.unwrap_or_default()) + } + + /// Create a WebSocket context that manages an post-handshake stream. + pub fn from_partially_read(part: Vec, role: Role, config: Option) -> Self { + Self::_new(role, FrameCodec::from_partially_read(part), config.unwrap_or_default()) + } - WebSocketContext { + fn _new(role: Role, mut frame: FrameCodec, config: WebSocketConfig) -> Self { + frame.set_max_out_buffer_len(config.max_write_buffer_size); + frame.set_out_buffer_write_len(config.write_buffer_size); + Self { role, - frame: FrameCodec::new().with_max_out_buffer_len(config.max_write_buffer_size), + frame, state: WebSocketState::Active, incomplete: None, additional_send: None, @@ -294,20 +313,11 @@ impl WebSocketContext { } } - /// Create a WebSocket context that manages an post-handshake stream. - pub fn from_partially_read(part: Vec, role: Role, config: Option) -> Self { - let config = config.unwrap_or_default(); - WebSocketContext { - frame: FrameCodec::from_partially_read(part) - .with_max_out_buffer_len(config.max_write_buffer_size), - ..WebSocketContext::new(role, Some(config)) - } - } - /// Change the configuration. pub fn set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig)) { set_func(&mut self.config); self.frame.set_max_out_buffer_len(self.config.max_write_buffer_size); + self.frame.set_out_buffer_write_len(self.config.write_buffer_size); } /// Read the configuration. @@ -412,6 +422,7 @@ impl WebSocketContext { Stream: Read + Write, { self._write(stream, None)?; + self.frame.write_out_buffer(stream)?; Ok(stream.flush()?) } @@ -424,9 +435,8 @@ impl WebSocketContext { where Stream: Read + Write, { - match data { - Some(data) => self.write_one_frame(stream, data)?, - None => self.frame.write_out_buffer(stream)?, + if let Some(data) = data { + self.buffer_frame(stream, data)?; } // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in @@ -434,7 +444,7 @@ impl WebSocketContext { // respond with Pong frame as soon as is practical. (RFC 6455) let should_flush = if let Some(msg) = self.additional_send.take() { trace!("Sending pong/close"); - match self.write_one_frame(stream, msg) { + match self.buffer_frame(stream, msg) { Err(Error::WriteBufferFull(Message::Frame(msg))) => { // if an system message would exceed the buffer put it back in // `additional_send` for retry. Otherwise returning this error @@ -457,6 +467,7 @@ impl WebSocketContext { // maximum segment lifetimes (2MSL), while there is no corresponding // server impact as a TIME_WAIT connection is immediately reopened upon // a new SYN with a higher seq number). (RFC 6455) + self.frame.write_out_buffer(stream)?; self.state = WebSocketState::Terminated; Err(Error::ConnectionClosed) } else { @@ -468,7 +479,7 @@ impl WebSocketContext { /// /// This function guarantees that the close frame will be queued. /// There is no need to call it again. Calling this function is - /// the same as calling `write(Message::Close(..))`. + /// the same as calling `send(Message::Close(..))`. pub fn close(&mut self, stream: &mut Stream, code: Option) -> Result<()> where Stream: Read + Write, @@ -477,10 +488,8 @@ impl WebSocketContext { self.state = WebSocketState::ClosedByUs; let frame = Frame::close(code); self._write(stream, Some(frame))?; - } else { - // Already closed, nothing to do. } - Ok(stream.flush()?) + self.flush(stream) } /// Try to decode one message frame. May return None. @@ -650,8 +659,8 @@ impl WebSocketContext { } } - /// Write a single frame into the stream via the write-buffer. - fn write_one_frame(&mut self, stream: &mut Stream, mut frame: Frame) -> Result<()> + /// Write a single frame into the write-buffer. + fn buffer_frame(&mut self, stream: &mut Stream, mut frame: Frame) -> Result<()> where Stream: Read + Write, { @@ -665,7 +674,7 @@ impl WebSocketContext { } trace!("Sending frame: {:?}", frame); - self.frame.write_frame(stream, frame).check_connection_reset(self.state) + self.frame.buffer_frame(stream, frame).check_connection_reset(self.state) } /// Replace `additional_send` if it is currently a `Pong` message. diff --git a/tests/write.rs b/tests/write.rs new file mode 100644 index 0000000..aa627cc --- /dev/null +++ b/tests/write.rs @@ -0,0 +1,68 @@ +use std::io::{self, Read, Write}; +use tungstenite::{protocol::WebSocketConfig, Message, WebSocket}; + +/// `Write` impl that records call stats and drops the data. +#[derive(Debug, Default)] +struct MockWrite { + written_bytes: usize, + write_count: usize, + flush_count: usize, +} + +impl Read for MockWrite { + fn read(&mut self, _: &mut [u8]) -> io::Result { + Err(io::Error::new(io::ErrorKind::WouldBlock, "reads not supported")) + } +} +impl Write for MockWrite { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.written_bytes += buf.len(); + self.write_count += 1; + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + self.flush_count += 1; + Ok(()) + } +} + +/// Test for write buffering and flushing behaviour. +#[test] +fn write_flush_behaviour() { + const SEND_ME_LEN: usize = 10; + const BATCH_ME_LEN: usize = 11; + const WRITE_BUFFER_SIZE: usize = 600; + + let mut ws = WebSocket::from_raw_socket( + MockWrite::default(), + tungstenite::protocol::Role::Server, + Some(WebSocketConfig { write_buffer_size: WRITE_BUFFER_SIZE, ..<_>::default() }), + ); + + assert_eq!(ws.get_ref().written_bytes, 0); + assert_eq!(ws.get_ref().write_count, 0); + assert_eq!(ws.get_ref().flush_count, 0); + + // `send` writes & flushes immediately + ws.send(Message::Text("Send me!".into())).unwrap(); + assert_eq!(ws.get_ref().written_bytes, SEND_ME_LEN); + assert_eq!(ws.get_ref().write_count, 1); + assert_eq!(ws.get_ref().flush_count, 1); + + // send a batch of messages + for msg in (0..100).map(|_| Message::Text("Batch me!".into())) { + ws.write(msg).unwrap(); + } + // after 55 writes the out_buffer will exceed write_buffer_size=600 + // and so do a single underlying write (not flushing). + assert_eq!(ws.get_ref().written_bytes, 55 * BATCH_ME_LEN + SEND_ME_LEN); + assert_eq!(ws.get_ref().write_count, 2); + assert_eq!(ws.get_ref().flush_count, 1); + + // flushing will perform a single write for the remaining out_buffer & flush. + ws.flush().unwrap(); + assert_eq!(ws.get_ref().written_bytes, 100 * BATCH_ME_LEN + SEND_ME_LEN); + assert_eq!(ws.get_ref().write_count, 3); + assert_eq!(ws.get_ref().flush_count, 2); +}