Update to the newest `tungstenite-rs` version

* Fixes #35
* Replaces https://github.com/snapview/tokio-tungstenite/pull/40
* Gives a good background for https://github.com/snapview/tokio-tungstenite/pull/41
pull/1/head
Daniel Abramov 7 years ago
parent 8b3f811be7
commit b61ed96561
  1. 7
      Cargo.toml
  2. 3
      src/connect.rs
  3. 106
      src/lib.rs

@ -20,7 +20,7 @@ stream = ["bytes"]
futures = "0.1.17"
[dependencies.tungstenite]
version = "0.5.3"
version = "0.6.0"
default-features = false
[dependencies.bytes]
@ -29,7 +29,7 @@ version = "0.4.6"
[dependencies.native-tls]
optional = true
version = "0.1.5"
version = "0.2.0"
[dependencies.tokio-dns-unofficial]
optional = true
@ -46,3 +46,6 @@ version = "0.1.4"
[dev-dependencies]
tokio-core = "0.1.12"
url = "1.6.0"
[patch.crates-io]
tokio-tls = { git = "https://github.com/aep/tokio-tls.git", rev = "7865734d2167160cabd4422aca76b8478e643b41" }

@ -57,8 +57,7 @@ mod encryption {
match mode {
Mode::Plain => Box::new(future::ok(StreamSwitcher::Plain(socket))),
Mode::Tls => {
Box::new(future::result(TlsConnector::builder())
.and_then(move |builder| future::result(builder.build()))
Box::new(future::result(TlsConnector::builder().build())
.and_then(move |connector| connector.connect_async(&domain, socket))
.map(|s| StreamSwitcher::Tls(s))
.map_err(|e| Error::Tls(e)))

@ -31,12 +31,16 @@ use std::io::ErrorKind;
use futures::{Poll, Future, Async, AsyncSink, Stream, Sink, StartSend};
use tokio::io::{AsyncRead, AsyncWrite};
use tungstenite::handshake::client::{ClientHandshake, Response, Request};
use tungstenite::handshake::server::{ServerHandshake, Callback, NoCallback};
use tungstenite::handshake::{HandshakeRole, HandshakeError};
use tungstenite::protocol::{WebSocket, Message, Role};
use tungstenite::error::Error as WsError;
use tungstenite::server;
use tungstenite::{
error::Error as WsError,
handshake::{
HandshakeRole, HandshakeError,
client::{ClientHandshake, Response, Request},
server::{ServerHandshake, Callback, NoCallback},
},
protocol::{WebSocket, Message, Role, WebSocketConfig},
server,
};
#[cfg(feature="connect")]
pub use connect::{connect_async, client_async_tls};
@ -53,14 +57,31 @@ pub use connect::{connect_async, client_async_tls};
///
/// This is typically used for clients who have already established, for
/// example, a TCP connection to the remote server.
pub fn client_async<'a, R, S>(request: R, stream: S) -> ConnectAsync<S>
pub fn client_async<'a, R, S>(
request: R,
stream: S,
) -> ConnectAsync<S>
where
R: Into<Request<'a>>,
S: AsyncRead + AsyncWrite
{
client_async_with_config(request, stream, None)
}
/// The same as `client_async()` but the one can specify a websocket configuration.
/// Please refer to `client_async()` for more details.
pub fn client_async_with_config<'a, R, S>(
request: R,
stream: S,
config: Option<WebSocketConfig>,
) -> ConnectAsync<S>
where
R: Into<Request<'a>>,
S: AsyncRead + AsyncWrite
{
ConnectAsync {
inner: MidHandshake {
inner: Some(ClientHandshake::start(stream, request.into()).handshake())
inner: Some(ClientHandshake::start(stream, request.into(), config).handshake())
}
}
}
@ -83,19 +104,45 @@ where
accept_hdr_async(stream, NoCallback)
}
/// The same as `accept_async()` but the one can specify a websocket configuration.
/// Please refer to `accept_async()` for more details.
pub fn accept_async_with_config<S>(
stream: S,
config: Option<WebSocketConfig>,
) -> AcceptAsync<S, NoCallback>
where
S: AsyncRead + AsyncWrite,
{
accept_hdr_async_with_config(stream, NoCallback, config)
}
/// Accepts a new WebSocket connection with the provided stream.
///
/// This function does the same as `accept_async()` but accepts an extra callback
/// for header processing. The callback receives headers of the incoming
/// requests and is able to add extra headers to the reply.
pub fn accept_hdr_async<S, C>(stream: S, callback: C) -> AcceptAsync<S, C>
where
S: AsyncRead + AsyncWrite,
C: Callback,
{
accept_hdr_async_with_config(stream, callback, None)
}
/// The same as `accept_hdr_async()` but the one can specify a websocket configuration.
/// Please refer to `accept_hdr_async()` for more details.
pub fn accept_hdr_async_with_config<S, C>(
stream: S,
callback: C,
config: Option<WebSocketConfig>,
) -> AcceptAsync<S, C>
where
S: AsyncRead + AsyncWrite,
C: Callback,
{
AcceptAsync {
inner: MidHandshake {
inner: Some(server::accept_hdr(stream, callback))
inner: Some(server::accept_hdr_with_config(stream, callback, config))
}
}
}
@ -116,15 +163,24 @@ pub struct WebSocketStream<S> {
impl<S> WebSocketStream<S> {
/// Convert a raw socket into a WebSocketStream without performing a
/// handshake.
pub fn from_raw_socket(stream: S, role: Role) -> Self {
let ws = WebSocket::from_raw_socket(stream, role);
pub fn from_raw_socket(
stream: S,
role: Role,
config: Option<WebSocketConfig>,
) -> Self {
let ws = WebSocket::from_raw_socket(stream, role, config);
WebSocketStream { inner: ws }
}
/// Convert a raw socket into a WebSocketStream without performing a
/// handshake.
pub fn from_partially_read(stream: S, part: Vec<u8>, role: Role) -> Self {
let ws = WebSocket::from_partially_read(stream, part, role);
pub fn from_partially_read(
stream: S,
part: Vec<u8>,
role: Role,
config: Option<WebSocketConfig>,
) -> Self {
let ws = WebSocket::from_partially_read(stream, part, role, config);
WebSocketStream { inner: ws }
}
}
@ -143,8 +199,7 @@ impl<T> Sink for WebSocketStream<T> where T: AsyncRead + AsyncWrite {
type SinkError = WsError;
fn start_send(&mut self, item: Message) -> StartSend<Message, WsError> {
try!(self.inner.write_message(item).to_async());
Ok(AsyncSink::Ready)
self.inner.write_message(item).to_start_send()
}
fn poll_complete(&mut self) -> Poll<(), WsError> {
@ -238,3 +293,24 @@ impl<T> ToAsync for Result<T, WsError> {
}
}
trait ToStartSend {
type T;
type E;
fn to_start_send(self) -> StartSend<Self::T, Self::E>;
}
impl ToStartSend for Result<(), WsError> {
type T = Message;
type E = WsError;
fn to_start_send(self) -> StartSend<Self::T, Self::E> {
match self {
Ok(_) => Ok(AsyncSink::Ready),
Err(error) => match error {
WsError::Io(ref err) if err.kind() == ErrorKind::WouldBlock => Ok(AsyncSink::Ready),
WsError::SendQueueFull(msg) => Ok(AsyncSink::NotReady(msg)),
err => Err(err),
}
}
}
}

Loading…
Cancel
Save