Merge pull request #358 from alexheretic/buffer-writes

Buffer writes before writing to the underlying stream
pull/359/head
Alexey Galakhov 2 years ago committed by GitHub
commit 8f23e1765e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      CHANGELOG.md
  2. 30
      benches/write.rs
  3. 36
      src/protocol/frame/mod.rs
  4. 59
      src/protocol/mod.rs
  5. 68
      tests/write.rs

@ -2,7 +2,7 @@
- Remove many implicit flushing behaviours. In general reading and writing messages will no
longer flush until calling `flush`. An exception is automatic responses (e.g. pongs)
which will continue to be written and flushed when reading and writing.
This allows writing a batch of messages and flushing once.
This allows writing a batch of messages and flushing once, improving performance.
- Add `WebSocket::read`, `write`, `send`, `flush`. Deprecate `read_message`, `write_message`, `write_pending`.
- Add `FrameSocket::read`, `write`, `send`, `flush`. Remove `read_frame`, `write_frame`, `write_pending`.
Note: Previous use of `write_frame` may be replaced with `send`.
@ -12,6 +12,8 @@
* Add `WebSocketConfig::max_write_buffer_size`. Deprecate `max_send_queue`.
* Add `Error::WriteBufferFull`. Remove `Error::SendQueueFull`.
Note: `WriteBufferFull` returns the message that could not be written as a `Message::Frame`.
- Add ability to buffer multiple writes before writing to the underlying stream, controlled by
`WebSocketConfig::write_buffer_size` (default 128 KiB). Improves batch message write performance.
# 0.19.0

@ -9,43 +9,51 @@ use tungstenite::{Message, WebSocket};
const MOCK_WRITE_LEN: usize = 8 * 1024 * 1024;
/// `Write` impl that simulates fast writes and slow flushes.
/// `Write` impl that simulates slowish writes and slow flushes.
///
/// Buffers up to 8 MiB fast on `write`. Each `flush` takes ~100ns.
struct MockSlowFlushWrite(Vec<u8>);
/// Each `write` can buffer up to 8 MiB before flushing but takes an additional **~80ns**
/// to simulate stuff going on in the underlying stream.
/// Each `flush` takes **~8µs** to simulate flush io.
struct MockWrite(Vec<u8>);
impl Read for MockSlowFlushWrite {
impl Read for MockWrite {
fn read(&mut self, _: &mut [u8]) -> io::Result<usize> {
Err(io::Error::new(io::ErrorKind::WouldBlock, "reads not supported"))
}
}
impl Write for MockSlowFlushWrite {
impl Write for MockWrite {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if self.0.len() + buf.len() > MOCK_WRITE_LEN {
self.flush()?;
}
// simulate io
spin(Duration::from_nanos(80));
self.0.extend(buf);
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
if !self.0.is_empty() {
// simulate 100ns io
let a = Instant::now();
while a.elapsed() < Duration::from_nanos(100) {
hint::spin_loop();
}
// simulate io
spin(Duration::from_micros(8));
self.0.clear();
}
Ok(())
}
}
fn spin(duration: Duration) {
let a = Instant::now();
while a.elapsed() < duration {
hint::spin_loop();
}
}
fn benchmark(c: &mut Criterion) {
// Writes 100k small json text messages then flushes
c.bench_function("write 100k small texts then flush", |b| {
let mut ws = WebSocket::from_raw_socket(
MockSlowFlushWrite(Vec::with_capacity(MOCK_WRITE_LEN)),
MockWrite(Vec::with_capacity(MOCK_WRITE_LEN)),
tungstenite::protocol::Role::Server,
None,
);

@ -80,7 +80,7 @@ where
/// is returned.
/// In order to handle WouldBlock or Incomplete, call [`flush`](Self::flush) afterwards.
pub fn write(&mut self, frame: Frame) -> Result<()> {
self.codec.write_frame(&mut self.stream, frame)
self.codec.buffer_frame(&mut self.stream, frame)
}
/// Flush writes.
@ -99,6 +99,12 @@ pub(super) struct FrameCodec {
out_buffer: Vec<u8>,
/// Capacity limit for `out_buffer`.
max_out_buffer_len: usize,
/// Buffer target length to reach before writing to the stream
/// on calls to `buffer_frame`.
///
/// Setting this to non-zero will buffer small writes from hitting
/// the stream.
out_buffer_write_len: usize,
/// Header and remaining size of the incoming packet being processed.
header: Option<(FrameHeader, u64)>,
}
@ -110,6 +116,7 @@ impl FrameCodec {
in_buffer: ReadBuffer::new(),
out_buffer: Vec::new(),
max_out_buffer_len: usize::MAX,
out_buffer_write_len: 0,
header: None,
}
}
@ -120,19 +127,20 @@ impl FrameCodec {
in_buffer: ReadBuffer::from_partially_read(part),
out_buffer: Vec::new(),
max_out_buffer_len: usize::MAX,
out_buffer_write_len: 0,
header: None,
}
}
/// Sets a maximum size for the out buffer.
pub(super) fn with_max_out_buffer_len(mut self, max: usize) -> Self {
pub(super) fn set_max_out_buffer_len(&mut self, max: usize) {
self.max_out_buffer_len = max;
self
}
/// Sets a maximum size for the out buffer.
pub(super) fn set_max_out_buffer_len(&mut self, max: usize) {
self.max_out_buffer_len = max;
/// Sets [`Self::buffer_frame`] buffer target length to reach before
/// writing to the stream.
pub(super) fn set_out_buffer_write_len(&mut self, len: usize) {
self.out_buffer_write_len = len;
}
/// Read a frame from the provided stream.
@ -193,10 +201,14 @@ impl FrameCodec {
Ok(Some(frame))
}
/// Write a frame to the provided stream.
/// Writes a frame into the `out_buffer`.
/// If the out buffer size is over the `out_buffer_write_len` will also write
/// the out buffer into the provided `stream`.
///
/// Does **not** flush.
pub(super) fn write_frame<Stream>(&mut self, stream: &mut Stream, frame: Frame) -> Result<()>
/// To ensure buffered frames are written call [`Self::write_out_buffer`].
///
/// May write to the stream, will **not** flush.
pub(super) fn buffer_frame<Stream>(&mut self, stream: &mut Stream, frame: Frame) -> Result<()>
where
Stream: Write,
{
@ -209,10 +221,14 @@ impl FrameCodec {
self.out_buffer.reserve(frame.len());
frame.format(&mut self.out_buffer).expect("Bug: can't write to vector");
if self.out_buffer.len() > self.out_buffer_write_len {
self.write_out_buffer(stream)
} else {
Ok(())
}
}
/// Write any buffered frames to the provided stream.
/// Writes the out_buffer to the provided stream.
///
/// Does **not** flush.
pub(super) fn write_out_buffer<Stream>(&mut self, stream: &mut Stream) -> Result<()>

@ -38,8 +38,17 @@ pub struct WebSocketConfig {
/// Does nothing, instead use `max_write_buffer_size`.
#[deprecated]
pub max_send_queue: Option<usize>,
/// The max size of the write buffer in bytes. Setting this can provide backpressure.
/// The target minimum size of the write buffer to reach before writing the data
/// to the underlying stream.
/// The default value is 128 KiB.
///
/// Note: [`flush`](WebSocket::flush) will always fully write the buffer regardless.
pub write_buffer_size: usize,
/// The max size of the write buffer in bytes. Setting this can provide backpressure
/// in the case the write buffer is filling up due to write errors.
/// The default value is unlimited.
///
/// Note: Should always be set higher than [`write_buffer_size`](Self::write_buffer_size).
pub max_write_buffer_size: usize,
/// The maximum size of a message. `None` means no size limit. The default value is 64 MiB
/// which should be reasonably big for all normal use-cases but small enough to prevent
@ -63,6 +72,7 @@ impl Default for WebSocketConfig {
#[allow(deprecated)]
WebSocketConfig {
max_send_queue: None,
write_buffer_size: 128 * 1024,
max_write_buffer_size: usize::MAX,
max_message_size: Some(64 << 20),
max_frame_size: Some(16 << 20),
@ -282,11 +292,20 @@ pub struct WebSocketContext {
impl WebSocketContext {
/// Create a WebSocket context that manages a post-handshake stream.
pub fn new(role: Role, config: Option<WebSocketConfig>) -> Self {
let config = config.unwrap_or_default();
Self::_new(role, FrameCodec::new(), config.unwrap_or_default())
}
/// Create a WebSocket context that manages an post-handshake stream.
pub fn from_partially_read(part: Vec<u8>, role: Role, config: Option<WebSocketConfig>) -> Self {
Self::_new(role, FrameCodec::from_partially_read(part), config.unwrap_or_default())
}
WebSocketContext {
fn _new(role: Role, mut frame: FrameCodec, config: WebSocketConfig) -> Self {
frame.set_max_out_buffer_len(config.max_write_buffer_size);
frame.set_out_buffer_write_len(config.write_buffer_size);
Self {
role,
frame: FrameCodec::new().with_max_out_buffer_len(config.max_write_buffer_size),
frame,
state: WebSocketState::Active,
incomplete: None,
additional_send: None,
@ -294,20 +313,11 @@ impl WebSocketContext {
}
}
/// Create a WebSocket context that manages an post-handshake stream.
pub fn from_partially_read(part: Vec<u8>, role: Role, config: Option<WebSocketConfig>) -> Self {
let config = config.unwrap_or_default();
WebSocketContext {
frame: FrameCodec::from_partially_read(part)
.with_max_out_buffer_len(config.max_write_buffer_size),
..WebSocketContext::new(role, Some(config))
}
}
/// Change the configuration.
pub fn set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig)) {
set_func(&mut self.config);
self.frame.set_max_out_buffer_len(self.config.max_write_buffer_size);
self.frame.set_out_buffer_write_len(self.config.write_buffer_size);
}
/// Read the configuration.
@ -412,6 +422,7 @@ impl WebSocketContext {
Stream: Read + Write,
{
self._write(stream, None)?;
self.frame.write_out_buffer(stream)?;
Ok(stream.flush()?)
}
@ -424,9 +435,8 @@ impl WebSocketContext {
where
Stream: Read + Write,
{
match data {
Some(data) => self.write_one_frame(stream, data)?,
None => self.frame.write_out_buffer(stream)?,
if let Some(data) = data {
self.buffer_frame(stream, data)?;
}
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
@ -434,7 +444,7 @@ impl WebSocketContext {
// respond with Pong frame as soon as is practical. (RFC 6455)
let should_flush = if let Some(msg) = self.additional_send.take() {
trace!("Sending pong/close");
match self.write_one_frame(stream, msg) {
match self.buffer_frame(stream, msg) {
Err(Error::WriteBufferFull(Message::Frame(msg))) => {
// if an system message would exceed the buffer put it back in
// `additional_send` for retry. Otherwise returning this error
@ -457,6 +467,7 @@ impl WebSocketContext {
// maximum segment lifetimes (2MSL), while there is no corresponding
// server impact as a TIME_WAIT connection is immediately reopened upon
// a new SYN with a higher seq number). (RFC 6455)
self.frame.write_out_buffer(stream)?;
self.state = WebSocketState::Terminated;
Err(Error::ConnectionClosed)
} else {
@ -468,7 +479,7 @@ impl WebSocketContext {
///
/// This function guarantees that the close frame will be queued.
/// There is no need to call it again. Calling this function is
/// the same as calling `write(Message::Close(..))`.
/// the same as calling `send(Message::Close(..))`.
pub fn close<Stream>(&mut self, stream: &mut Stream, code: Option<CloseFrame>) -> Result<()>
where
Stream: Read + Write,
@ -477,10 +488,8 @@ impl WebSocketContext {
self.state = WebSocketState::ClosedByUs;
let frame = Frame::close(code);
self._write(stream, Some(frame))?;
} else {
// Already closed, nothing to do.
}
Ok(stream.flush()?)
self.flush(stream)
}
/// Try to decode one message frame. May return None.
@ -650,8 +659,8 @@ impl WebSocketContext {
}
}
/// Write a single frame into the stream via the write-buffer.
fn write_one_frame<Stream>(&mut self, stream: &mut Stream, mut frame: Frame) -> Result<()>
/// Write a single frame into the write-buffer.
fn buffer_frame<Stream>(&mut self, stream: &mut Stream, mut frame: Frame) -> Result<()>
where
Stream: Read + Write,
{
@ -665,7 +674,7 @@ impl WebSocketContext {
}
trace!("Sending frame: {:?}", frame);
self.frame.write_frame(stream, frame).check_connection_reset(self.state)
self.frame.buffer_frame(stream, frame).check_connection_reset(self.state)
}
/// Replace `additional_send` if it is currently a `Pong` message.

@ -0,0 +1,68 @@
use std::io::{self, Read, Write};
use tungstenite::{protocol::WebSocketConfig, Message, WebSocket};
/// `Write` impl that records call stats and drops the data.
#[derive(Debug, Default)]
struct MockWrite {
written_bytes: usize,
write_count: usize,
flush_count: usize,
}
impl Read for MockWrite {
fn read(&mut self, _: &mut [u8]) -> io::Result<usize> {
Err(io::Error::new(io::ErrorKind::WouldBlock, "reads not supported"))
}
}
impl Write for MockWrite {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.written_bytes += buf.len();
self.write_count += 1;
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
self.flush_count += 1;
Ok(())
}
}
/// Test for write buffering and flushing behaviour.
#[test]
fn write_flush_behaviour() {
const SEND_ME_LEN: usize = 10;
const BATCH_ME_LEN: usize = 11;
const WRITE_BUFFER_SIZE: usize = 600;
let mut ws = WebSocket::from_raw_socket(
MockWrite::default(),
tungstenite::protocol::Role::Server,
Some(WebSocketConfig { write_buffer_size: WRITE_BUFFER_SIZE, ..<_>::default() }),
);
assert_eq!(ws.get_ref().written_bytes, 0);
assert_eq!(ws.get_ref().write_count, 0);
assert_eq!(ws.get_ref().flush_count, 0);
// `send` writes & flushes immediately
ws.send(Message::Text("Send me!".into())).unwrap();
assert_eq!(ws.get_ref().written_bytes, SEND_ME_LEN);
assert_eq!(ws.get_ref().write_count, 1);
assert_eq!(ws.get_ref().flush_count, 1);
// send a batch of messages
for msg in (0..100).map(|_| Message::Text("Batch me!".into())) {
ws.write(msg).unwrap();
}
// after 55 writes the out_buffer will exceed write_buffer_size=600
// and so do a single underlying write (not flushing).
assert_eq!(ws.get_ref().written_bytes, 55 * BATCH_ME_LEN + SEND_ME_LEN);
assert_eq!(ws.get_ref().write_count, 2);
assert_eq!(ws.get_ref().flush_count, 1);
// flushing will perform a single write for the remaining out_buffer & flush.
ws.flush().unwrap();
assert_eq!(ws.get_ref().written_bytes, 100 * BATCH_ME_LEN + SEND_ME_LEN);
assert_eq!(ws.get_ref().write_count, 3);
assert_eq!(ws.get_ref().flush_count, 2);
}
Loading…
Cancel
Save