Add "connect" convenience function.

Signed-off-by: Alexey Galakhov <agalakhov@snapview.de>
pull/1/head
Alexey Galakhov 8 years ago
parent c8dc978dd5
commit 5fbcd8d413
  1. 25
      Cargo.toml
  2. 93
      src/connect.rs
  3. 8
      src/lib.rs
  4. 73
      src/stream.rs

@ -10,11 +10,36 @@ documentation = "https://docs.rs/tokio-tungstenite"
repository = "https://github.com/snapview/tokio-tungstenite" repository = "https://github.com/snapview/tokio-tungstenite"
version = "0.2.1" version = "0.2.1"
[features]
default = ["connect", "tls"]
connect = ["tokio-dns-unofficial", "tokio-core"]
tls = ["tokio-tls", "native-tls", "bytes"]
[dependencies] [dependencies]
futures = "*" futures = "*"
tokio-io = "*" tokio-io = "*"
tungstenite = "*" tungstenite = "*"
url = "*" url = "*"
[dependencies.bytes]
optional = true
version = "*"
[dependencies.native-tls]
optional = true
version = "*"
[dependencies.tokio-dns-unofficial]
optional = true
version = "*"
[dependencies.tokio-core]
optional = true
version = "*"
[dependencies.tokio-tls]
optional = true
version = "*"
[dev-dependencies] [dev-dependencies]
tokio-core = "*" tokio-core = "*"

@ -0,0 +1,93 @@
//! Connection helper.
extern crate tokio_dns;
extern crate tokio_core;
use self::tokio_dns::tcp_connect;
use self::tokio_core::reactor::Remote;
use futures::{Future, BoxFuture};
use futures::future;
use super::{WebSocketStream, Request, client_async};
use tungstenite::Error;
use tungstenite::client::url_mode;
#[cfg(feature="tls")]
mod encryption {
extern crate native_tls;
extern crate tokio_tls;
use super::tokio_core::net::TcpStream;
use self::native_tls::TlsConnector;
use self::tokio_tls::{TlsConnectorExt, TlsStream};
use futures::{Future, BoxFuture};
use futures::future;
use tungstenite::Error;
use tungstenite::stream::Mode;
pub use stream::Stream as StreamSwitcher;
pub type AutoStream = StreamSwitcher<TcpStream, TlsStream<TcpStream>>;
pub fn wrap_stream(socket: TcpStream, domain: String, mode: Mode) -> BoxFuture<AutoStream, Error> {
match mode {
Mode::Plain => future::ok(StreamSwitcher::Plain(socket)).boxed(),
Mode::Tls => {
future::result(TlsConnector::builder())
.and_then(move |builder| future::result(builder.build()))
.and_then(move |connector| connector.connect_async(&domain, socket))
.map(|s| StreamSwitcher::Tls(s))
.map_err(|e| Error::Tls(e))
.boxed()
}
}
}
}
#[cfg(not(feature="tls"))]
mod encryption {
use super::tokio_core::net::TcpStream;
use futures::{Future, BoxFuture};
use futures::future;
use tungstenite::Error;
use tungstenite::stream::Mode;
pub type AutoStream = TcpStream;
pub fn wrap_stream(socket: TcpStream, _domain: String, mode: Mode) -> BoxFuture<AutoStream, Error> {
match mode {
Mode::Plain => future::ok(socket).boxed(),
Mode::Tls => future::err(Error::Url("TLS support not compiled in.".into())).boxed(),
}
}
}
use self::encryption::{AutoStream, wrap_stream};
/// Connect to a given URL.
pub fn connect_async<R>(request: R, handle: Remote) -> BoxFuture<WebSocketStream<AutoStream>, Error>
where R: Into<Request<'static>>
{
let request: Request = request.into();
// Make sure we check domain and mode first. URL must be valid.
let mode = match url_mode(&request.url) {
Ok(m) => m,
Err(e) => return future::err(e.into()).boxed(),
};
let domain = match request.url.host_str() {
Some(d) => d.to_string(),
None => return future::err(Error::Url("No host name in the URL".into())).boxed(),
};
let port = request.url.port_or_known_default().expect("Bug: port unknown");
tcp_connect((domain.as_str(), port), handle).map_err(|e| e.into())
.and_then(move |socket| wrap_stream(socket, domain, mode))
.and_then(move |stream| client_async(request, stream))
.boxed()
}

@ -25,6 +25,11 @@ extern crate tokio_io;
extern crate tungstenite; extern crate tungstenite;
extern crate url; extern crate url;
#[cfg(feature="connect")]
mod connect;
#[cfg(all(feature="connect", feature="tls"))]
mod stream;
use std::io::ErrorKind; use std::io::ErrorKind;
use futures::{Poll, Future, Async, AsyncSink, Stream, Sink, StartSend}; use futures::{Poll, Future, Async, AsyncSink, Stream, Sink, StartSend};
@ -39,6 +44,9 @@ use tungstenite::protocol::{WebSocket, Message};
use tungstenite::error::Error as WsError; use tungstenite::error::Error as WsError;
use tungstenite::server; use tungstenite::server;
#[cfg(feature="connect")]
pub use connect::connect_async;
/// A WebSocket request /// A WebSocket request
pub struct Request<'a> { pub struct Request<'a> {
/// URL of the request. /// URL of the request.

@ -0,0 +1,73 @@
//! Convenience wrapper for streams to switch between plain TCP and TLS at runtime.
//!
//! There is no dependency on actual TLS implementations. Everything like
//! `native_tls` or `openssl` will work as long as there is a TLS stream supporting standard
//! `Read + Write` traits.
extern crate bytes;
use std::io::{Read, Write, Result as IoResult, Error as IoError};
use self::bytes::{Buf, BufMut};
use futures::Poll;
use tokio_io::{AsyncRead, AsyncWrite};
/// Stream, either plain TCP or TLS.
pub enum Stream<S, T> {
Plain(S),
Tls(T),
}
impl<S: Read, T: Read> Read for Stream<S, T> {
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
match *self {
Stream::Plain(ref mut s) => s.read(buf),
Stream::Tls(ref mut s) => s.read(buf),
}
}
}
impl<S: Write, T: Write> Write for Stream<S, T> {
fn write(&mut self, buf: &[u8]) -> IoResult<usize> {
match *self {
Stream::Plain(ref mut s) => s.write(buf),
Stream::Tls(ref mut s) => s.write(buf),
}
}
fn flush(&mut self) -> IoResult<()> {
match *self {
Stream::Plain(ref mut s) => s.flush(),
Stream::Tls(ref mut s) => s.flush(),
}
}
}
impl<S: AsyncRead, T: AsyncRead> AsyncRead for Stream<S, T> {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
match *self {
Stream::Plain(ref s) => s.prepare_uninitialized_buffer(buf),
Stream::Tls(ref s) => s.prepare_uninitialized_buffer(buf),
}
}
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, IoError> {
match *self {
Stream::Plain(ref mut s) => s.read_buf(buf),
Stream::Tls(ref mut s) => s.read_buf(buf),
}
}
}
impl<S: AsyncWrite, T: AsyncWrite> AsyncWrite for Stream<S, T> {
fn shutdown(&mut self) -> Poll<(), IoError> {
match *self {
Stream::Plain(ref mut s) => s.shutdown(),
Stream::Tls(ref mut s) => s.shutdown(),
}
}
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, IoError> {
match *self {
Stream::Plain(ref mut s) => s.write_buf(buf),
Stream::Tls(ref mut s) => s.write_buf(buf),
}
}
}
Loading…
Cancel
Save