Port/rename to async-std / async-tungstenite

pull/1/head
Sebastian Dröge 5 years ago
parent 0c2779965d
commit b76d8cbc03
  1. 4
      .travis.yml
  2. 42
      Cargo.toml
  3. 1
      LICENSE
  4. 25
      README.md
  5. 9
      examples/autobahn-client.rs
  6. 15
      examples/autobahn-server.rs
  7. 26
      examples/client.rs
  8. 20
      examples/server.rs
  9. 27
      examples/split-client.rs
  10. 2
      src/compat.rs
  11. 32
      src/connect.rs
  12. 2
      src/handshake.rs
  13. 22
      src/lib.rs
  14. 10
      src/stream.rs
  15. 21
      tests/communication.rs
  16. 12
      tests/handshakes.rs

@ -1,6 +1,8 @@
language: rust
rust:
- nightly-2019-09-05
- stable
- beta
- nightly
before_script:
- export PATH="$PATH:$HOME/.cargo/bin"

@ -1,30 +1,29 @@
[package]
name = "tokio-tungstenite"
description = "Tokio binding for Tungstenite, the Lightweight stream-based WebSocket implementation"
name = "async-tungstenite"
description = "async-std binding for Tungstenite, the Lightweight stream-based WebSocket implementation"
categories = ["web-programming::websocket", "network-programming", "asynchronous", "concurrency"]
keywords = ["websocket", "io", "web"]
authors = ["Daniel Abramov <dabramov@snapview.de>", "Alexey Galakhov <agalakhov@snapview.de>"]
authors = ["Sebastian Dröge <sebastian@centricular.com>"]
license = "MIT"
homepage = "https://github.com/snapview/tokio-tungstenite"
documentation = "https://docs.rs/tokio-tungstenite/0.9.0"
repository = "https://github.com/snapview/tokio-tungstenite"
version = "0.10.0-alpha.1"
homepage = "https://github.com/sdroege/async-tungstenite"
repository = "https://github.com/sdroege/async-tungstenite"
version = "0.1.0"
edition = "2018"
[features]
default = ["connect", "tls"]
connect = ["tokio-net", "stream"]
tls = ["tokio-tls", "native-tls", "stream", "tungstenite/tls"]
default = ["connect", "tls", "async_std_runtime"]
connect = ["stream"]
async_std_runtime = ["connect", "tls", "async-std"]
tls = ["async-tls", "stream"]
stream = ["bytes"]
[dependencies]
log = "0.4"
futures-preview = { version = "0.3.0-alpha.19", features = ["async-await"] }
futures = { version = "0.3", features = ["async-await"] }
pin-project = "0.4.0-alpha.9"
tokio-io = "0.2.0-alpha.6"
[dependencies.tungstenite]
#version = "0.9.1"
#version = "0.9.2"
git = "https://github.com/snapview/tungstenite-rs.git"
branch = "master"
default-features = false
@ -33,20 +32,15 @@ default-features = false
optional = true
version = "0.4.8"
[dependencies.native-tls]
[dependencies.async-std]
optional = true
version = "0.2.0"
version = "1.0"
[dependencies.tokio-net]
[dependencies.async-tls]
optional = true
version = "0.2.0-alpha.6"
features = ["tcp"]
[dependencies.tokio-tls]
optional = true
version = "0.3.0-alpha.6"
version = "0.6.0"
[dev-dependencies]
tokio = "0.2.0-alpha.6"
url = "2.0.0"
env_logger = "0.6.1"
env_logger = "0.7"
async-std = { version = "1.0", features = ["attributes"] }

@ -1,5 +1,6 @@
Copyright (c) 2017 Daniel Abramov
Copyright (c) 2017 Alexey Galakhov
Copyright (c) 2019 Sebastian Dröge
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal

@ -1,12 +1,12 @@
# tokio-tungstenite
# async-tungstenite
Asynchronous WebSockets for Tokio stack.
Asynchronous WebSockets for [async-std](https://async.rs) and `std` `Future`s.
[![MIT licensed](https://img.shields.io/badge/license-MIT-blue.svg)](./LICENSE)
[![Crates.io](https://img.shields.io/crates/v/tokio-tungstenite.svg?maxAge=2592000)](https://crates.io/crates/tokio-tungstenite)
[![Build Status](https://travis-ci.org/snapview/tokio-tungstenite.svg?branch=master)](https://travis-ci.org/snapview/tokio-tungstenite)
[![Crates.io](https://img.shields.io/crates/v/async-tungstenite.svg?maxAge=2592000)](https://crates.io/crates/async-tungstenite)
[![Build Status](https://travis-ci.org/sdroege/async-tungstenite.svg?branch=master)](https://travis-ci.org/sdroege/async-tungstenite)
[Documentation](https://docs.rs/tokio-tungstenite)
[Documentation](https://docs.rs/async-tungstenite)
## Usage
@ -14,13 +14,18 @@ Add this in your `Cargo.toml`:
```toml
[dependencies]
tokio-tungstenite = "*"
async-tungstenite = "*"
```
Take a look at the `examples/` directory for client and server examples. You may also want to get familiar with
[tokio](https://tokio.rs/) if you don't have any experience with it.
[`async-std`](https://async.rs/) if you don't have any experience with it.
## What is tokio-tungstenite?
## What is async-tungstenite?
This crate is based on `tungstenite-rs` Rust WebSocket library and provides `tokio` bindings and wrappers for it, so you
can use it with non-blocking/asynchronous `TcpStream`s from and couple it together with other crates from `tokio` stack.
This crate is based on `tungstenite-rs` Rust WebSocket library and provides async-std bindings and wrappers for it, so you
can use it with non-blocking/asynchronous `TcpStream`s from and couple it together with other crates from the async-std stack.
## tokio-tungstenite
Originally this crate was created as a fork of [tokio-tungstenite](https://github.com/snapview/tokio-tungstenite)
and ported to [async-std](https://async.rs).

@ -1,6 +1,6 @@
use futures::StreamExt;
use log::*;
use tokio_tungstenite::{connect_async, tungstenite::Result};
use async_tungstenite::{connect_async, tungstenite::Result};
use url::Url;
const AGENT: &'static str = "Tungstenite";
@ -43,8 +43,7 @@ async fn run_test(case: u32) {
}
}
#[tokio::main]
async fn main() {
async fn run() {
env_logger::init();
let total = get_case_count().await.unwrap();
@ -55,3 +54,7 @@ async fn main() {
update_reports().await.unwrap();
}
fn main() {
async_std::task::block_on(run());
}

@ -1,8 +1,7 @@
use futures::StreamExt;
use log::*;
use std::net::{SocketAddr, ToSocketAddrs};
use tokio::net::{TcpListener, TcpStream};
use tokio_tungstenite::accept_async;
use async_std::net::{TcpListener, TcpStream, SocketAddr, ToSocketAddrs};
use async_tungstenite::accept_async;
async fn accept_connection(peer: SocketAddr, stream: TcpStream) {
let mut ws_stream = accept_async(stream).await.expect("Failed to accept");
@ -17,12 +16,12 @@ async fn accept_connection(peer: SocketAddr, stream: TcpStream) {
}
}
#[tokio::main]
async fn main() {
async fn run() {
env_logger::init();
let addr = "127.0.0.1:9002"
.to_socket_addrs()
.await
.expect("Not a valid address")
.next()
.expect("Not a socket address");
@ -37,6 +36,10 @@ async fn main() {
.expect("connected streams should have a peer address");
info!("Peer address: {}", peer);
tokio::spawn(accept_connection(peer, stream));
async_std::task::spawn(accept_connection(peer, stream));
}
}
fn main() {
async_std::task::block_on(run());
}

@ -11,17 +11,17 @@
//! You can use this example together with the `server` example.
use std::env;
use std::io::{self, Write};
use futures::StreamExt;
use log::*;
use tungstenite::protocol::Message;
use tokio::io::AsyncReadExt;
use tokio_tungstenite::connect_async;
use async_std::prelude::*;
use async_std::io;
use async_std::task;
use async_tungstenite::connect_async;
#[tokio::main]
async fn main() {
async fn run() {
let _ = env_logger::try_init();
// Specify the server address to which the client will be connecting.
@ -31,12 +31,10 @@ async fn main() {
let url = url::Url::parse(&connect_addr).unwrap();
// Right now Tokio doesn't support a handle to stdin running on the event
// loop, so we farm out that work to a separate thread. This thread will
// read data from stdin and then send it to the event loop over a standard
// futures channel.
// Spawn a new task that will will read data from stdin and then send it to the event loop over
// a standard futures channel.
let (stdin_tx, mut stdin_rx) = futures::channel::mpsc::unbounded();
tokio::spawn(read_stdin(stdin_tx));
task::spawn(read_stdin(stdin_tx));
// After the TCP connection has been established, we set up our client to
// start forwarding data.
@ -61,7 +59,7 @@ async fn main() {
ws_stream.send(msg).await.expect("Failed to send request");
if let Some(msg) = ws_stream.next().await {
let msg = msg.expect("Failed to get response");
stdout.write_all(&msg.into_data()).unwrap();
stdout.write_all(&msg.into_data()).await.unwrap();
}
}
}
@ -69,7 +67,7 @@ async fn main() {
// Our helper method which will read data from stdin and send it along the
// sender provided.
async fn read_stdin(tx: futures::channel::mpsc::UnboundedSender<Message>) {
let mut stdin = tokio::io::stdin();
let mut stdin = io::stdin();
loop {
let mut buf = vec![0; 1024];
let n = match stdin.read(&mut buf).await {
@ -80,3 +78,7 @@ async fn read_stdin(tx: futures::channel::mpsc::UnboundedSender<Message>) {
tx.unbounded_send(Message::binary(buf)).unwrap();
}
}
fn main() {
task::block_on(run())
}

@ -23,8 +23,9 @@ use std::io::Error;
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures::StreamExt;
use log::*;
use std::net::{SocketAddr, ToSocketAddrs};
use tokio::net::{TcpListener, TcpStream};
use async_std::task;
use async_std::net::{SocketAddr, ToSocketAddrs};
use async_std::net::{TcpListener, TcpStream};
use tungstenite::protocol::Message;
struct Connection {
@ -50,7 +51,7 @@ async fn accept_connection(stream: TcpStream) {
.expect("connected streams should have a peer address");
info!("Peer address: {}", addr);
let mut ws_stream = tokio_tungstenite::accept_async(stream)
let mut ws_stream = async_tungstenite::accept_async(stream)
.await
.expect("Error during the websocket handshake occurred");
@ -66,7 +67,7 @@ async fn accept_connection(stream: TcpStream) {
rx: msg_rx,
tx: response_tx,
};
tokio::spawn(handle_connection(c));
task::spawn(handle_connection(c));
while let Some(message) = ws_stream.next().await {
let message = message.expect("Failed to get request");
@ -78,12 +79,13 @@ async fn accept_connection(stream: TcpStream) {
}
}
}
#[tokio::main]
async fn main() -> Result<(), Error> {
async fn run() -> Result<(), Error> {
let _ = env_logger::try_init();
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr
.to_socket_addrs()
.await
.expect("Not a valid address")
.next()
.expect("Not a socket address");
@ -96,8 +98,12 @@ async fn main() -> Result<(), Error> {
while let Some(stream) = incoming.next().await {
let stream = stream.expect("Failed to accept stream");
tokio::spawn(accept_connection(stream));
task::spawn(accept_connection(stream));
}
Ok(())
}
fn main() -> Result<(), Error> {
task::block_on(run())
}

@ -11,17 +11,16 @@
//! You can use this example together with the `server` example.
use std::env;
use std::io::{self, Write};
use futures::{SinkExt, StreamExt};
use log::*;
use tungstenite::protocol::Message;
use async_std::prelude::*;
use async_std::io;
use async_std::task;
use async_tungstenite::connect_async;
use tokio::io::AsyncReadExt;
use tokio_tungstenite::connect_async;
#[tokio::main]
async fn main() {
async fn run() {
let _ = env_logger::try_init();
// Specify the server address to which the client will be connecting.
@ -31,12 +30,10 @@ async fn main() {
let url = url::Url::parse(&connect_addr).unwrap();
// Right now Tokio doesn't support a handle to stdin running on the event
// loop, so we farm out that work to a separate thread. This thread will
// read data from stdin and then send it to the event loop over a standard
// futures channel.
// Spawn a new task that will read data from stdin and then send it to the event loop over a
// standard futures channel.
let (stdin_tx, mut stdin_rx) = futures::channel::mpsc::unbounded();
tokio::spawn(read_stdin(stdin_tx));
task::spawn(read_stdin(stdin_tx));
// After the TCP connection has been established, we set up our client to
// start forwarding data.
@ -62,7 +59,7 @@ async fn main() {
ws_tx.send(msg).await.expect("Failed to send request");
if let Some(msg) = ws_rx.next().await {
let msg = msg.expect("Failed to get response");
stdout.write_all(&msg.into_data()).unwrap();
stdout.write_all(&msg.into_data()).await.unwrap();
}
}
}
@ -70,7 +67,7 @@ async fn main() {
// Our helper method which will read data from stdin and send it along the
// sender provided.
async fn read_stdin(tx: futures::channel::mpsc::UnboundedSender<Message>) {
let mut stdin = tokio::io::stdin();
let mut stdin = io::stdin();
loop {
let mut buf = vec![0; 1024];
let n = match stdin.read(&mut buf).await {
@ -81,3 +78,7 @@ async fn read_stdin(tx: futures::channel::mpsc::UnboundedSender<Message>) {
tx.unbounded_send(Message::binary(buf)).unwrap();
}
}
fn main() {
task::block_on(run())
}

@ -3,7 +3,7 @@ use std::io::{Read, Write};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio_io::{AsyncRead, AsyncWrite};
use futures::io::{AsyncRead, AsyncWrite};
use tungstenite::{Error as WsError, WebSocket};
pub(crate) trait HasContext {

@ -1,23 +1,22 @@
//! Connection helper.
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_net::tcp::TcpStream;
use tungstenite::client::url_mode;
use tungstenite::handshake::client::Response;
use tungstenite::Error;
use futures::io::{AsyncRead, AsyncWrite};
use super::{client_async, Request, WebSocketStream};
#[cfg(feature = "tls")]
pub(crate) mod encryption {
use native_tls::TlsConnector;
use tokio_tls::{TlsConnector as TokioTlsConnector, TlsStream};
use tokio_io::{AsyncRead, AsyncWrite};
use async_tls::TlsConnector as AsyncTlsConnector;
use async_tls::client::TlsStream;
use tungstenite::stream::Mode;
use tungstenite::Error;
use futures::io::{AsyncRead, AsyncWrite};
use crate::stream::Stream as StreamSwitcher;
/// A stream that might be protected with TLS.
@ -36,12 +35,10 @@ pub(crate) mod encryption {
match mode {
Mode::Plain => Ok(StreamSwitcher::Plain(socket)),
Mode::Tls => {
let try_connector = TlsConnector::new();
let connector = try_connector.map_err(Error::Tls)?;
let stream = TokioTlsConnector::from(connector);
let connected = stream.connect(&domain, socket).await;
let stream = AsyncTlsConnector::new();
let connected = stream.connect(&domain, socket)?.await;
match connected {
Err(e) => Err(Error::Tls(e)),
Err(e) => Err(Error::Io(e)),
Ok(s) => Ok(StreamSwitcher::Tls(s)),
}
}
@ -55,7 +52,7 @@ pub use self::encryption::MaybeTlsStream;
#[cfg(not(feature = "tls"))]
pub(crate) mod encryption {
use futures::{future, Future};
use tokio_io::{AsyncRead, AsyncWrite};
use futures::io::{AsyncRead, AsyncWrite};
use tungstenite::stream::Mode;
use tungstenite::Error;
@ -110,6 +107,11 @@ where
client_async(request, stream).await
}
#[cfg(feature = "async_std_runtime")]
pub(crate) mod async_std_runtime {
use super::*;
use async_std::net::TcpStream;
/// Connect to a given URL.
pub async fn connect_async<R>(
request: R,
@ -129,3 +131,7 @@ where
let socket = try_socket.map_err(Error::Io)?;
client_async_tls(request, socket).await
}
}
#[cfg(feature = "async_std_runtime")]
pub use async_std_runtime::connect_async;

@ -6,7 +6,7 @@ use std::future::Future;
use std::io::{Read, Write};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio_io::{AsyncRead, AsyncWrite};
use futures::io::{AsyncRead, AsyncWrite};
use tungstenite::handshake::client::Response;
use tungstenite::handshake::server::Callback;
use tungstenite::handshake::{HandshakeError as Error, HandshakeRole, MidHandshake as WsHandshake};

@ -2,7 +2,7 @@
//!
//! This library is an implementation of WebSocket handshakes and streams. It
//! is based on the crate which implements all required WebSocket protocol
//! logic. So this crate basically just brings tokio support / tokio integration
//! logic. So this crate basically just brings async_std support / async_std integration
//! to it.
//!
//! Each WebSocket stream implements the required `Stream` and `Sink` traits,
@ -34,7 +34,7 @@ use pin_project::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio_io::{AsyncRead, AsyncWrite};
use futures::io::{AsyncRead, AsyncWrite};
use tungstenite::{
error::Error as WsError,
@ -47,7 +47,9 @@ use tungstenite::{
};
#[cfg(feature = "connect")]
pub use connect::{client_async_tls, connect_async};
pub use connect::client_async_tls;
#[cfg(feature = "async_std_runtime")]
pub use connect::connect_async;
#[cfg(all(feature = "connect", feature = "tls"))]
pub use connect::MaybeTlsStream;
@ -382,7 +384,7 @@ mod tests {
use crate::connect::encryption::AutoStream;
use crate::WebSocketStream;
use std::io::{Read, Write};
use tokio_io::{AsyncReadExt, AsyncWriteExt};
use futures::io::{AsyncReadExt, AsyncWriteExt};
fn is_read<T: Read>() {}
fn is_write<T: Write>() {}
@ -392,13 +394,13 @@ mod tests {
#[test]
fn web_socket_stream_has_traits() {
is_read::<AllowStd<tokio::net::TcpStream>>();
is_write::<AllowStd<tokio::net::TcpStream>>();
is_read::<AllowStd<async_std::net::TcpStream>>();
is_write::<AllowStd<async_std::net::TcpStream>>();
is_async_read::<AutoStream<tokio::net::TcpStream>>();
is_async_write::<AutoStream<tokio::net::TcpStream>>();
is_async_read::<AutoStream<async_std::net::TcpStream>>();
is_async_write::<AutoStream<async_std::net::TcpStream>>();
is_unpin::<WebSocketStream<tokio::net::TcpStream>>();
is_unpin::<WebSocketStream<AutoStream<tokio::net::TcpStream>>>();
is_unpin::<WebSocketStream<async_std::net::TcpStream>>();
is_unpin::<WebSocketStream<AutoStream<async_std::net::TcpStream>>>();
}
}

@ -2,11 +2,11 @@
//!
//! There is no dependency on actual TLS implementations. Everything like
//! `native_tls` or `openssl` will work as long as there is a TLS stream supporting standard
//! `Read + Write` traits.
//! `AsyncRead + AsyncWrite` traits.
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio_io::{AsyncRead, AsyncWrite};
use futures::io::{AsyncRead, AsyncWrite};
/// Stream, either plain TCP or TLS.
pub enum Stream<S, T> {
@ -69,18 +69,18 @@ impl<S: AsyncWrite + Unpin, T: AsyncWrite + Unpin> AsyncWrite for Stream<S, T> {
}
}
fn poll_shutdown(
fn poll_close(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
match *self {
Stream::Plain(ref mut s) => {
let pinned = unsafe { Pin::new_unchecked(s) };
pinned.poll_shutdown(cx)
pinned.poll_close(cx)
}
Stream::Tls(ref mut s) => {
let pinned = unsafe { Pin::new_unchecked(s) };
pinned.poll_shutdown(cx)
pinned.poll_close(cx)
}
}
}

@ -1,9 +1,8 @@
use futures::{SinkExt, StreamExt};
use futures::{SinkExt, StreamExt, AsyncRead, AsyncWrite};
use log::*;
use std::net::ToSocketAddrs;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::tcp::{TcpListener, TcpStream};
use tokio_tungstenite::{accept_async, client_async, WebSocketStream};
use async_std::task;
use async_std::net::{TcpListener, TcpStream, ToSocketAddrs};
use async_tungstenite::{accept_async, client_async, WebSocketStream};
use tungstenite::Message;
async fn run_connection<S>(
@ -23,7 +22,7 @@ async fn run_connection<S>(
msg_tx.send(messages).expect("Failed to send results");
}
#[tokio::test]
#[async_std::test]
async fn communication() {
let _ = env_logger::try_init();
@ -33,6 +32,7 @@ async fn communication() {
let f = async move {
let address = "0.0.0.0:12345"
.to_socket_addrs()
.await
.expect("Not a valid address")
.next()
.expect("No address resolved");
@ -48,13 +48,14 @@ async fn communication() {
run_connection(stream, msg_tx).await;
};
tokio::spawn(f);
task::spawn(f);
info!("Waiting for server to be ready");
con_rx.await.expect("Server not ready");
let address = "0.0.0.0:12345"
.to_socket_addrs()
.await
.expect("Not a valid address")
.next()
.expect("No address resolved");
@ -78,7 +79,7 @@ async fn communication() {
assert_eq!(messages.len(), 10);
}
#[tokio::test]
#[async_std::test]
async fn split_communication() {
let _ = env_logger::try_init();
@ -88,6 +89,7 @@ async fn split_communication() {
let f = async move {
let address = "0.0.0.0:12346"
.to_socket_addrs()
.await
.expect("Not a valid address")
.next()
.expect("No address resolved");
@ -103,13 +105,14 @@ async fn split_communication() {
run_connection(stream, msg_tx).await;
};
tokio::spawn(f);
task::spawn(f);
info!("Waiting for server to be ready");
con_rx.await.expect("Server not ready");
let address = "0.0.0.0:12346"
.to_socket_addrs()
.await
.expect("Not a valid address")
.next()
.expect("No address resolved");

@ -1,15 +1,16 @@
use futures::StreamExt;
use std::net::ToSocketAddrs;
use tokio::net::tcp::{TcpListener, TcpStream};
use tokio_tungstenite::{accept_async, client_async};
use async_std::task;
use async_std::net::{TcpListener, TcpStream, ToSocketAddrs};
use async_tungstenite::{accept_async, client_async};
#[tokio::test]
#[async_std::test]
async fn handshakes() {
let (tx, rx) = futures::channel::oneshot::channel();
let f = async move {
let address = "0.0.0.0:12345"
.to_socket_addrs()
.await
.expect("Not a valid address")
.next()
.expect("No address resolved");
@ -23,11 +24,12 @@ async fn handshakes() {
}
};
tokio::spawn(f);
task::spawn(f);
rx.await.expect("Failed to wait for server to be ready");
let address = "0.0.0.0:12345"
.to_socket_addrs()
.await
.expect("Not a valid address")
.next()
.expect("No address resolved");

Loading…
Cancel
Save