Merge branch 'devel'

pull/1/head
Alexey Galakhov 7 years ago
commit 5e8005f002
  1. 33
      Cargo.toml
  2. 47
      examples/client.rs
  3. 119
      src/connect.rs
  4. 9
      src/lib.rs
  5. 90
      src/stream.rs

@ -6,15 +6,42 @@ keywords = ["websocket", "io", "web"]
authors = ["Daniel Abramov <dabramov@snapview.de>", "Alexey Galakhov <agalakhov@snapview.de>"]
license = "MIT"
homepage = "https://github.com/snapview/tokio-tungstenite"
documentation = "https://docs.rs/tokio-tungstenite/0.2.0"
documentation = "https://docs.rs/tokio-tungstenite/0.2.1"
repository = "https://github.com/snapview/tokio-tungstenite"
version = "0.2.0"
version = "0.2.1"
[features]
default = ["connect", "tls"]
connect = ["tokio-dns-unofficial", "tokio-core", "stream"]
tls = ["tokio-tls", "native-tls", "stream"]
stream = ["bytes"]
[dependencies]
futures = "0.1.13"
tokio-io = "0.1.1"
tungstenite = "0.2.3"
tungstenite = "0.2.4"
url = "1.4.0"
[dependencies.bytes]
optional = true
version = "*"
[dependencies.native-tls]
optional = true
version = "*"
[dependencies.tokio-dns-unofficial]
optional = true
version = "*"
[dependencies.tokio-core]
optional = true
version = "*"
[dependencies.tokio-tls]
optional = true
version = "*"
[dev-dependencies]
tokio-core = "0.1.7"

@ -18,16 +18,14 @@ extern crate url;
use std::env;
use std::io::{self, Read, Write};
use std::net::ToSocketAddrs;
use std::thread;
use futures::sync::mpsc;
use futures::{Future, Sink, Stream};
use tokio_core::net::TcpStream;
use tokio_core::reactor::Core;
use tungstenite::protocol::Message;
use tokio_tungstenite::client_async;
use tokio_tungstenite::connect_async;
fn main() {
// Specify the server address to which the client will be connecting.
@ -35,14 +33,11 @@ fn main() {
panic!("this program requires at least one argument")
});
// Get a first IP address of the server from the server URL.
let url = url::Url::parse(&connect_addr).unwrap();
let addr = url.to_socket_addrs().unwrap().next().unwrap();
// Create the event loop and initiate the connection to the remote server.
let mut core = Core::new().unwrap();
let handle = core.handle();
let tcp = TcpStream::connect(&addr, &handle);
// Right now Tokio doesn't support a handle to stdin running on the event
// loop, so we farm out that work to a separate thread. This thread will
@ -68,30 +63,28 @@ fn main() {
// finishes. If we don't have any more data to read or we won't receive any
// more work from the remote then we can exit.
let mut stdout = io::stdout();
let client = tcp.and_then(|stream| {
client_async(url, stream).and_then(|ws_stream| {
println!("WebSocket handshake has been successfully completed");
let client = connect_async(url, handle.remote().clone()).and_then(|ws_stream| {
println!("WebSocket handshake has been successfully completed");
// `sink` is the stream of messages going out.
// `stream` is the stream of incoming messages.
let (sink, stream) = ws_stream.split();
// `sink` is the stream of messages going out.
// `stream` is the stream of incoming messages.
let (sink, stream) = ws_stream.split();
// We forward all messages, composed out of the data, entered to
// the stdin, to the `sink`.
let send_stdin = stdin_rx.forward(sink);
let write_stdout = stream.for_each(|message| {
stdout.write_all(&message.into_data()).unwrap();
Ok(())
});
// We forward all messages, composed out of the data, entered to
// the stdin, to the `sink`.
let send_stdin = stdin_rx.forward(sink);
let write_stdout = stream.for_each(|message| {
stdout.write_all(&message.into_data()).unwrap();
Ok(())
});
// Wait for either of futures to complete.
send_stdin.map(|_| ())
.select(write_stdout.map(|_| ()))
.then(|_| Ok(()))
}).map_err(|e| {
println!("Error during the websocket handshake occurred: {}", e);
io::Error::new(io::ErrorKind::Other, e)
})
// Wait for either of futures to complete.
send_stdin.map(|_| ())
.select(write_stdout.map(|_| ()))
.then(|_| Ok(()))
}).map_err(|e| {
println!("Error during the websocket handshake occurred: {}", e);
io::Error::new(io::ErrorKind::Other, e)
});
// And now that we've got our client, we execute it in the event loop!

@ -0,0 +1,119 @@
//! Connection helper.
extern crate tokio_dns;
extern crate tokio_core;
use self::tokio_dns::tcp_connect;
use self::tokio_core::reactor::Remote;
use std::io::Result as IoResult;
use futures::{Future, BoxFuture};
use futures::future;
use super::{WebSocketStream, Request, client_async};
use tungstenite::Error;
use tungstenite::client::url_mode;
use stream::NoDelay;
use self::tokio_core::net::TcpStream;
impl NoDelay for TcpStream {
fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
TcpStream::set_nodelay(self, nodelay)
}
}
#[cfg(feature="tls")]
mod encryption {
extern crate native_tls;
extern crate tokio_tls;
use super::tokio_core::net::TcpStream;
use self::native_tls::TlsConnector;
use self::tokio_tls::{TlsConnectorExt, TlsStream};
use std::io::{Read, Write, Result as IoResult};
use futures::{Future, BoxFuture};
use futures::future;
use tungstenite::Error;
use tungstenite::stream::Mode;
use stream::NoDelay;
pub use stream::Stream as StreamSwitcher;
pub type AutoStream = StreamSwitcher<TcpStream, TlsStream<TcpStream>>;
impl<T: Read + Write + NoDelay> NoDelay for TlsStream<T> {
fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
self.get_mut().get_mut().set_nodelay(nodelay)
}
}
pub fn wrap_stream(socket: TcpStream, domain: String, mode: Mode) -> BoxFuture<AutoStream, Error> {
match mode {
Mode::Plain => future::ok(StreamSwitcher::Plain(socket)).boxed(),
Mode::Tls => {
future::result(TlsConnector::builder())
.and_then(move |builder| future::result(builder.build()))
.and_then(move |connector| connector.connect_async(&domain, socket))
.map(|s| StreamSwitcher::Tls(s))
.map_err(|e| Error::Tls(e))
.boxed()
}
}
}
}
#[cfg(not(feature="tls"))]
mod encryption {
use super::tokio_core::net::TcpStream;
use futures::{Future, BoxFuture};
use futures::future;
use tungstenite::Error;
use tungstenite::stream::Mode;
pub type AutoStream = TcpStream;
pub fn wrap_stream(socket: TcpStream, _domain: String, mode: Mode) -> BoxFuture<AutoStream, Error> {
match mode {
Mode::Plain => future::ok(socket).boxed(),
Mode::Tls => future::err(Error::Url("TLS support not compiled in.".into())).boxed(),
}
}
}
use self::encryption::{AutoStream, wrap_stream};
/// Connect to a given URL.
pub fn connect_async<R>(request: R, handle: Remote) -> BoxFuture<WebSocketStream<AutoStream>, Error>
where R: Into<Request<'static>>
{
let request: Request = request.into();
// Make sure we check domain and mode first. URL must be valid.
let mode = match url_mode(&request.url) {
Ok(m) => m,
Err(e) => return future::err(e.into()).boxed(),
};
let domain = match request.url.host_str() {
Some(d) => d.to_string(),
None => return future::err(Error::Url("No host name in the URL".into())).boxed(),
};
let port = request.url.port_or_known_default().expect("Bug: port unknown");
tcp_connect((domain.as_str(), port), handle).map_err(|e| e.into())
.and_then(move |socket| wrap_stream(socket, domain, mode))
.and_then(|mut stream| {
NoDelay::set_nodelay(&mut stream, true)
.map(move |()| stream)
.map_err(|e| e.into())
})
.and_then(move |stream| client_async(request, stream))
.boxed()
}

@ -25,6 +25,12 @@ extern crate tokio_io;
extern crate tungstenite;
extern crate url;
#[cfg(feature="connect")]
mod connect;
#[cfg(feature="stream")]
pub mod stream;
use std::io::ErrorKind;
use futures::{Poll, Future, Async, AsyncSink, Stream, Sink, StartSend};
@ -39,6 +45,9 @@ use tungstenite::protocol::{WebSocket, Message};
use tungstenite::error::Error as WsError;
use tungstenite::server;
#[cfg(feature="connect")]
pub use connect::connect_async;
/// A WebSocket request
pub struct Request<'a> {
/// URL of the request.

@ -0,0 +1,90 @@
//! Convenience wrapper for streams to switch between plain TCP and TLS at runtime.
//!
//! There is no dependency on actual TLS implementations. Everything like
//! `native_tls` or `openssl` will work as long as there is a TLS stream supporting standard
//! `Read + Write` traits.
extern crate bytes;
use std::io::{Read, Write, Result as IoResult, Error as IoError};
use self::bytes::{Buf, BufMut};
use futures::Poll;
use tokio_io::{AsyncRead, AsyncWrite};
/// Trait to switch TCP_NODELAY.
pub trait NoDelay {
/// Set the TCP_NODELAY option to the given value.
fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()>;
}
/// Stream, either plain TCP or TLS.
pub enum Stream<S, T> {
/// Unencrypted socket stream.
Plain(S),
/// Encrypted socket stream.
Tls(T),
}
impl<S: Read, T: Read> Read for Stream<S, T> {
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
match *self {
Stream::Plain(ref mut s) => s.read(buf),
Stream::Tls(ref mut s) => s.read(buf),
}
}
}
impl<S: Write, T: Write> Write for Stream<S, T> {
fn write(&mut self, buf: &[u8]) -> IoResult<usize> {
match *self {
Stream::Plain(ref mut s) => s.write(buf),
Stream::Tls(ref mut s) => s.write(buf),
}
}
fn flush(&mut self) -> IoResult<()> {
match *self {
Stream::Plain(ref mut s) => s.flush(),
Stream::Tls(ref mut s) => s.flush(),
}
}
}
impl<S: NoDelay, T: NoDelay> NoDelay for Stream<S, T> {
fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
match *self {
Stream::Plain(ref mut s) => s.set_nodelay(nodelay),
Stream::Tls(ref mut s) => s.set_nodelay(nodelay),
}
}
}
impl<S: AsyncRead, T: AsyncRead> AsyncRead for Stream<S, T> {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
match *self {
Stream::Plain(ref s) => s.prepare_uninitialized_buffer(buf),
Stream::Tls(ref s) => s.prepare_uninitialized_buffer(buf),
}
}
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, IoError> {
match *self {
Stream::Plain(ref mut s) => s.read_buf(buf),
Stream::Tls(ref mut s) => s.read_buf(buf),
}
}
}
impl<S: AsyncWrite, T: AsyncWrite> AsyncWrite for Stream<S, T> {
fn shutdown(&mut self) -> Poll<(), IoError> {
match *self {
Stream::Plain(ref mut s) => s.shutdown(),
Stream::Tls(ref mut s) => s.shutdown(),
}
}
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, IoError> {
match *self {
Stream::Plain(ref mut s) => s.write_buf(buf),
Stream::Tls(ref mut s) => s.write_buf(buf),
}
}
}
Loading…
Cancel
Save