diff --git a/examples/autobahn-server.rs b/examples/autobahn-server.rs new file mode 100644 index 0000000..8d0ac03 --- /dev/null +++ b/examples/autobahn-server.rs @@ -0,0 +1,36 @@ +#[macro_use] extern crate log; +extern crate env_logger; +extern crate ws2; + +use std::net::{TcpListener, TcpStream}; +use std::thread::spawn; + +use ws2::server::accept; +use ws2::error::Result; +use ws2::handshake::Handshake; + +fn handle_client(stream: TcpStream) -> Result<()> { + let mut socket = accept(stream).handshake_wait()?; + loop { + let msg = socket.read_message()?; + socket.write_message(msg)?; + } +} + +fn main() { + env_logger::init().unwrap(); + + let server = TcpListener::bind("127.0.0.1:9001").unwrap(); + + for stream in server.incoming() { + spawn(move || { + match stream { + Ok(stream) => match handle_client(stream) { + Ok(_) => (), + Err(e) => warn!("Error in client: {}", e), + }, + Err(e) => warn!("Error accepting stream: {}", e), + } + }); + } +} diff --git a/src/handshake/server.rs b/src/handshake/server.rs index 3b3bc7b..dcab055 100644 --- a/src/handshake/server.rs +++ b/src/handshake/server.rs @@ -1,9 +1,20 @@ +use std::io::{Cursor, Read, Write}; use bytes::Buf; use httparse; use httparse::Status; +use input_buffer::{InputBuffer, MIN_READ}; use error::{Error, Result}; -use super::{Headers, Httparse, FromHttparse, convert_key, MAX_HEADERS}; +use super::{ + Handshake, + HandshakeResult, + Headers, + Httparse, + FromHttparse, + convert_key, + MAX_HEADERS +}; +use protocol::{WebSocket, Role}; /// Request from the client. pub struct Request { @@ -56,6 +67,64 @@ impl<'h, 'b: 'h> FromHttparse> for Request { } } +/// Server handshake +pub struct ServerHandshake { + stream: Stream, + state: HandshakeState, +} + +impl ServerHandshake { + /// Start a new server handshake on top of given stream. + pub fn new(stream: Stream) -> Self { + ServerHandshake { + stream: stream, + state: HandshakeState::ReceivingRequest(InputBuffer::with_capacity(MIN_READ)), + } + } +} + +impl Handshake for ServerHandshake { + type Stream = WebSocket; + fn handshake(mut self) -> Result> { + debug!("Performing server handshake..."); + match self.state { + HandshakeState::ReceivingRequest(mut req_buf) => { + req_buf.reserve(MIN_READ, usize::max_value()) + .map_err(|_| Error::Capacity("Header too long".into()))?; + req_buf.read_from(&mut self.stream)?; + let state = if let Some(req) = Request::parse(&mut req_buf)? { + let resp = req.reply()?; + HandshakeState::SendingResponse(Cursor::new(resp)) + } else { + HandshakeState::ReceivingRequest(req_buf) + }; + Ok(HandshakeResult::Incomplete(ServerHandshake { + state: state, + ..self + })) + } + HandshakeState::SendingResponse(mut resp) => { + let size = self.stream.write(Buf::bytes(&resp))?; + Buf::advance(&mut resp, size); + if resp.has_remaining() { + Ok(HandshakeResult::Incomplete(ServerHandshake { + state: HandshakeState::SendingResponse(resp), + ..self + })) + } else { + let ws = WebSocket::from_raw_socket(self.stream, Role::Server); + Ok(HandshakeResult::Done(ws)) + } + } + } + } +} + +enum HandshakeState { + ReceivingRequest(InputBuffer), + SendingResponse(Cursor>), +} + #[cfg(test)] mod tests { diff --git a/src/lib.rs b/src/lib.rs index fbd9281..d6185a1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,6 +22,7 @@ extern crate utf8; pub mod error; pub mod protocol; pub mod client; +pub mod server; pub mod handshake; mod input_buffer; diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..fca1368 --- /dev/null +++ b/src/server.rs @@ -0,0 +1,8 @@ +use std::net::TcpStream; + +use handshake::server::ServerHandshake; + +/// Accept the given TcpStream as a WebSocket. +pub fn accept(stream: TcpStream) -> ServerHandshake { + ServerHandshake::new(stream) +}