From 6f8c4e70349312ce87c57b7d9515ed14dc5ccb6a Mon Sep 17 00:00:00 2001 From: Alexey Galakhov Date: Tue, 12 Dec 2017 00:39:59 +0100 Subject: [PATCH] Use input_buffer as separate crate. Signed-off-by: Alexey Galakhov --- Cargo.toml | 5 ++- src/handshake/machine.rs | 8 ++-- src/input_buffer.rs | 89 --------------------------------------- src/lib.rs | 3 +- src/protocol/frame/mod.rs | 9 ++-- 5 files changed, 14 insertions(+), 100 deletions(-) delete mode 100644 src/input_buffer.rs diff --git a/Cargo.toml b/Cargo.toml index 2c58e0a..39ccb2a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,9 +7,9 @@ authors = ["Alexey Galakhov"] license = "MIT/Apache-2.0" readme = "README.md" homepage = "https://github.com/snapview/tungstenite-rs" -documentation = "https://docs.rs/tungstenite/0.5.0" +documentation = "https://docs.rs/tungstenite/0.5.1" repository = "https://github.com/snapview/tungstenite-rs" -version = "0.5.0" +version = "0.5.1" [features] default = ["tls"] @@ -20,6 +20,7 @@ base64 = "0.6.0" byteorder = "1.1.0" bytes = "0.4.4" httparse = "1.2.3" +input_buffer = "0.1.1" log = "0.3.8" rand = "0.3.16" sha1 = "0.2.0" diff --git a/src/handshake/machine.rs b/src/handshake/machine.rs index ec98f7e..bacc866 100644 --- a/src/handshake/machine.rs +++ b/src/handshake/machine.rs @@ -42,9 +42,11 @@ impl HandshakeMachine { trace!("Doing handshake round."); match self.state { HandshakeState::Reading(mut buf) => { - buf.reserve(MIN_READ, usize::max_value()) // TODO limit size - .map_err(|_| Error::Capacity("Header too long".into()))?; - match buf.read_from(&mut self.stream).no_block()? { + let read = buf.prepare_reserve(MIN_READ) + .with_limit(usize::max_value()) // TODO limit size + .map_err(|_| Error::Capacity("Header too long".into()))? + .read_from(&mut self.stream).no_block()?; + match read { Some(0) => { Err(Error::Protocol("Handshake not finished".into())) } diff --git a/src/input_buffer.rs b/src/input_buffer.rs deleted file mode 100644 index c2a87c0..0000000 --- a/src/input_buffer.rs +++ /dev/null @@ -1,89 +0,0 @@ -use std::io::{Cursor, Read, Result as IoResult}; -use bytes::{Buf, BufMut}; - -/// A FIFO buffer for reading packets from network. -pub struct InputBuffer(Cursor>); - -/// The minimum read size. -pub const MIN_READ: usize = 4096; - -/// Size limit error. -pub struct SizeLimit; - -impl InputBuffer { - /// Create a new empty one. - pub fn with_capacity(capacity: usize) -> Self { - InputBuffer(Cursor::new(Vec::with_capacity(capacity))) - } - - /// Create a new one from partially read data. - pub fn from_partially_read(part: Vec) -> Self { - InputBuffer(Cursor::new(part)) - } - - /// Reserve the given amount of space. - pub fn reserve(&mut self, space: usize, limit: usize) -> Result<(), SizeLimit>{ - let remaining = self.inp_mut().capacity() - self.inp_mut().len(); - if remaining >= space { - // We have enough space right now. - Ok(()) - } else { - let pos = self.out().position() as usize; - self.inp_mut().drain(0..pos); - self.out_mut().set_position(0); - let avail = self.inp_mut().capacity() - self.inp_mut().len(); - if space <= avail { - Ok(()) - } else if self.inp_mut().capacity() + space > limit { - Err(SizeLimit) - } else { - self.inp_mut().reserve(space - avail); - Ok(()) - } - } - } - - /// Read data from stream into the buffer. - pub fn read_from(&mut self, stream: &mut S) -> IoResult { - let size; - let buf = self.inp_mut(); - unsafe { - size = stream.read(buf.bytes_mut())?; - buf.advance_mut(size); - } - Ok(size) - } - - /// Get the rest of the buffer and destroy the buffer. - pub fn into_vec(mut self) -> Vec { - let pos = self.out().position() as usize; - self.inp_mut().drain(0..pos); - self.0.into_inner() - } - - /// The output end (to the application). - pub fn out(&self) -> &Cursor> { - &self.0 // the cursor itself - } - /// The output end (to the application). - pub fn out_mut(&mut self) -> &mut Cursor> { - &mut self.0 // the cursor itself - } - - /// The input end (to the network). - fn inp_mut(&mut self) -> &mut Vec { - self.0.get_mut() // underlying vector - } -} - -impl Buf for InputBuffer { - fn remaining(&self) -> usize { - Buf::remaining(self.out()) - } - fn bytes(&self) -> &[u8] { - Buf::bytes(self.out()) - } - fn advance(&mut self, size: usize) { - Buf::advance(self.out_mut(), size) - } -} diff --git a/src/lib.rs b/src/lib.rs index e3ca3d6..6dd9fb5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,6 +14,7 @@ extern crate base64; extern crate byteorder; extern crate bytes; extern crate httparse; +extern crate input_buffer; extern crate rand; extern crate sha1; extern crate url; @@ -28,8 +29,6 @@ pub mod handshake; pub mod stream; pub mod util; -mod input_buffer; - pub use client::{connect, client}; pub use server::{accept, accept_hdr}; pub use error::{Error, Result}; diff --git a/src/protocol/frame/mod.rs b/src/protocol/frame/mod.rs index 77fae36..49f9c8c 100644 --- a/src/protocol/frame/mod.rs +++ b/src/protocol/frame/mod.rs @@ -57,14 +57,15 @@ impl FrameSocket /// Read a frame from stream. pub fn read_frame(&mut self) -> Result> { loop { - if let Some(frame) = Frame::parse(&mut self.in_buffer.out_mut())? { + if let Some(frame) = Frame::parse(&mut self.in_buffer.as_cursor_mut())? { trace!("received frame {}", frame); return Ok(Some(frame)); } // No full frames in buffer. - self.in_buffer.reserve(MIN_READ, usize::max_value()) - .map_err(|_| Error::Capacity("Incoming TCP buffer is full".into()))?; - let size = self.in_buffer.read_from(&mut self.stream)?; + let size = self.in_buffer.prepare_reserve(MIN_READ) + .with_limit(usize::max_value()) + .map_err(|_| Error::Capacity("Incoming TCP buffer is full".into()))? + .read_from(&mut self.stream)?; if size == 0 { trace!("no frame received"); return Ok(None)