Use input_buffer as separate crate.

Signed-off-by: Alexey Galakhov <agalakhov@snapview.de>
pull/24/merge v0.5.1
Alexey Galakhov 7 years ago
parent 65a47f95a6
commit 6f8c4e7034
  1. 5
      Cargo.toml
  2. 8
      src/handshake/machine.rs
  3. 89
      src/input_buffer.rs
  4. 3
      src/lib.rs
  5. 9
      src/protocol/frame/mod.rs

@ -7,9 +7,9 @@ authors = ["Alexey Galakhov"]
license = "MIT/Apache-2.0"
readme = "README.md"
homepage = "https://github.com/snapview/tungstenite-rs"
documentation = "https://docs.rs/tungstenite/0.5.0"
documentation = "https://docs.rs/tungstenite/0.5.1"
repository = "https://github.com/snapview/tungstenite-rs"
version = "0.5.0"
version = "0.5.1"
[features]
default = ["tls"]
@ -20,6 +20,7 @@ base64 = "0.6.0"
byteorder = "1.1.0"
bytes = "0.4.4"
httparse = "1.2.3"
input_buffer = "0.1.1"
log = "0.3.8"
rand = "0.3.16"
sha1 = "0.2.0"

@ -42,9 +42,11 @@ impl<Stream: Read + Write> HandshakeMachine<Stream> {
trace!("Doing handshake round.");
match self.state {
HandshakeState::Reading(mut buf) => {
buf.reserve(MIN_READ, usize::max_value()) // TODO limit size
.map_err(|_| Error::Capacity("Header too long".into()))?;
match buf.read_from(&mut self.stream).no_block()? {
let read = buf.prepare_reserve(MIN_READ)
.with_limit(usize::max_value()) // TODO limit size
.map_err(|_| Error::Capacity("Header too long".into()))?
.read_from(&mut self.stream).no_block()?;
match read {
Some(0) => {
Err(Error::Protocol("Handshake not finished".into()))
}

@ -1,89 +0,0 @@
use std::io::{Cursor, Read, Result as IoResult};
use bytes::{Buf, BufMut};
/// A FIFO buffer for reading packets from network.
pub struct InputBuffer(Cursor<Vec<u8>>);
/// The minimum read size.
pub const MIN_READ: usize = 4096;
/// Size limit error.
pub struct SizeLimit;
impl InputBuffer {
/// Create a new empty one.
pub fn with_capacity(capacity: usize) -> Self {
InputBuffer(Cursor::new(Vec::with_capacity(capacity)))
}
/// Create a new one from partially read data.
pub fn from_partially_read(part: Vec<u8>) -> Self {
InputBuffer(Cursor::new(part))
}
/// Reserve the given amount of space.
pub fn reserve(&mut self, space: usize, limit: usize) -> Result<(), SizeLimit>{
let remaining = self.inp_mut().capacity() - self.inp_mut().len();
if remaining >= space {
// We have enough space right now.
Ok(())
} else {
let pos = self.out().position() as usize;
self.inp_mut().drain(0..pos);
self.out_mut().set_position(0);
let avail = self.inp_mut().capacity() - self.inp_mut().len();
if space <= avail {
Ok(())
} else if self.inp_mut().capacity() + space > limit {
Err(SizeLimit)
} else {
self.inp_mut().reserve(space - avail);
Ok(())
}
}
}
/// Read data from stream into the buffer.
pub fn read_from<S: Read>(&mut self, stream: &mut S) -> IoResult<usize> {
let size;
let buf = self.inp_mut();
unsafe {
size = stream.read(buf.bytes_mut())?;
buf.advance_mut(size);
}
Ok(size)
}
/// Get the rest of the buffer and destroy the buffer.
pub fn into_vec(mut self) -> Vec<u8> {
let pos = self.out().position() as usize;
self.inp_mut().drain(0..pos);
self.0.into_inner()
}
/// The output end (to the application).
pub fn out(&self) -> &Cursor<Vec<u8>> {
&self.0 // the cursor itself
}
/// The output end (to the application).
pub fn out_mut(&mut self) -> &mut Cursor<Vec<u8>> {
&mut self.0 // the cursor itself
}
/// The input end (to the network).
fn inp_mut(&mut self) -> &mut Vec<u8> {
self.0.get_mut() // underlying vector
}
}
impl Buf for InputBuffer {
fn remaining(&self) -> usize {
Buf::remaining(self.out())
}
fn bytes(&self) -> &[u8] {
Buf::bytes(self.out())
}
fn advance(&mut self, size: usize) {
Buf::advance(self.out_mut(), size)
}
}

@ -14,6 +14,7 @@ extern crate base64;
extern crate byteorder;
extern crate bytes;
extern crate httparse;
extern crate input_buffer;
extern crate rand;
extern crate sha1;
extern crate url;
@ -28,8 +29,6 @@ pub mod handshake;
pub mod stream;
pub mod util;
mod input_buffer;
pub use client::{connect, client};
pub use server::{accept, accept_hdr};
pub use error::{Error, Result};

@ -57,14 +57,15 @@ impl<Stream> FrameSocket<Stream>
/// Read a frame from stream.
pub fn read_frame(&mut self) -> Result<Option<Frame>> {
loop {
if let Some(frame) = Frame::parse(&mut self.in_buffer.out_mut())? {
if let Some(frame) = Frame::parse(&mut self.in_buffer.as_cursor_mut())? {
trace!("received frame {}", frame);
return Ok(Some(frame));
}
// No full frames in buffer.
self.in_buffer.reserve(MIN_READ, usize::max_value())
.map_err(|_| Error::Capacity("Incoming TCP buffer is full".into()))?;
let size = self.in_buffer.read_from(&mut self.stream)?;
let size = self.in_buffer.prepare_reserve(MIN_READ)
.with_limit(usize::max_value())
.map_err(|_| Error::Capacity("Incoming TCP buffer is full".into()))?
.read_from(&mut self.stream)?;
if size == 0 {
trace!("no frame received");
return Ok(None)

Loading…
Cancel
Save