Migrate from pin-project to pin-project-lite

pull/77/head
Constantin Nickel 4 years ago committed by Sebastian Dröge
parent 33e3722ab0
commit ab28b2d367
  1. 2
      Cargo.toml
  2. 47
      src/tokio.rs
  3. 2
      src/tokio/dummy_tls.rs
  4. 4
      src/tokio/native_tls.rs
  5. 4
      src/tokio/openssl.rs
  6. 4
      src/tokio/rustls.rs

@ -30,7 +30,7 @@ features = ["async-std-runtime", "tokio-runtime", "gio-runtime", "async-tls", "a
log = "0.4" log = "0.4"
futures-util = { version = "0.3", default-features = false, features = ["sink", "std"] } futures-util = { version = "0.3", default-features = false, features = ["sink", "std"] }
futures-io = { version = "0.3", default-features = false, features = ["std"] } futures-io = { version = "0.3", default-features = false, features = ["std"] }
pin-project = "1" pin-project-lite = "0.2"
[dependencies.tungstenite] [dependencies.tungstenite]
version = "0.11.0" version = "0.11.0"

@ -103,7 +103,7 @@ where
R: IntoClientRequest + Unpin, R: IntoClientRequest + Unpin,
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin, S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{ {
crate::client_async_with_config(request, TokioAdapter(stream), config).await crate::client_async_with_config(request, TokioAdapter::new(stream), config).await
} }
/// Accepts a new WebSocket connection with the provided stream. /// Accepts a new WebSocket connection with the provided stream.
@ -163,7 +163,7 @@ where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin, S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
C: Callback + Unpin, C: Callback + Unpin,
{ {
crate::accept_hdr_async_with_config(TokioAdapter(stream), callback, config).await crate::accept_hdr_async_with_config(TokioAdapter::new(stream), callback, config).await
} }
/// Type alias for the stream type of the `client_async()` functions. /// Type alias for the stream type of the `client_async()` functions.
@ -379,15 +379,30 @@ where
client_async_tls_with_connector_and_config(request, socket, connector, config).await client_async_tls_with_connector_and_config(request, socket, connector, config).await
} }
use pin_project::pin_project;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
/// Adapter for `tokio::io::AsyncRead` and `tokio::io::AsyncWrite` to provide pin_project_lite::pin_project! {
/// the variants from the `futures` crate and the other way around. /// Adapter for `tokio::io::AsyncRead` and `tokio::io::AsyncWrite` to provide
#[pin_project] /// the variants from the `futures` crate and the other way around.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct TokioAdapter<T>(#[pin] pub T); pub struct TokioAdapter<T> {
#[pin]
inner: T,
}
}
impl<T> TokioAdapter<T> {
/// Creates a new `TokioAdapter` wrapping the provided value.
pub fn new(inner: T) -> Self {
Self { inner }
}
/// Consumes this `TokioAdapter`, returning the underlying value.
pub fn into_inner(self) -> T {
self.inner
}
}
impl<T: tokio::io::AsyncRead> AsyncRead for TokioAdapter<T> { impl<T: tokio::io::AsyncRead> AsyncRead for TokioAdapter<T> {
fn poll_read( fn poll_read(
@ -396,7 +411,7 @@ impl<T: tokio::io::AsyncRead> AsyncRead for TokioAdapter<T> {
buf: &mut [u8], buf: &mut [u8],
) -> Poll<std::io::Result<usize>> { ) -> Poll<std::io::Result<usize>> {
let mut buf = tokio::io::ReadBuf::new(buf); let mut buf = tokio::io::ReadBuf::new(buf);
match self.project().0.poll_read(cx, &mut buf)? { match self.project().inner.poll_read(cx, &mut buf)? {
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
Poll::Ready(_) => Poll::Ready(Ok(buf.filled().len())), Poll::Ready(_) => Poll::Ready(Ok(buf.filled().len())),
} }
@ -409,15 +424,15 @@ impl<T: tokio::io::AsyncWrite> AsyncWrite for TokioAdapter<T> {
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &[u8], buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> { ) -> Poll<Result<usize, std::io::Error>> {
self.project().0.poll_write(cx, buf) self.project().inner.poll_write(cx, buf)
} }
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> { fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
self.project().0.poll_flush(cx) self.project().inner.poll_flush(cx)
} }
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> { fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
self.project().0.poll_shutdown(cx) self.project().inner.poll_shutdown(cx)
} }
} }
@ -428,7 +443,7 @@ impl<T: AsyncRead> tokio::io::AsyncRead for TokioAdapter<T> {
buf: &mut tokio::io::ReadBuf<'_>, buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> { ) -> Poll<std::io::Result<()>> {
let slice = buf.initialize_unfilled(); let slice = buf.initialize_unfilled();
let n = match self.project().0.poll_read(cx, slice)? { let n = match self.project().inner.poll_read(cx, slice)? {
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
Poll::Ready(n) => n, Poll::Ready(n) => n,
}; };
@ -443,17 +458,17 @@ impl<T: AsyncWrite> tokio::io::AsyncWrite for TokioAdapter<T> {
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &[u8], buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> { ) -> Poll<Result<usize, std::io::Error>> {
self.project().0.poll_write(cx, buf) self.project().inner.poll_write(cx, buf)
} }
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> { fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
self.project().0.poll_flush(cx) self.project().inner.poll_flush(cx)
} }
fn poll_shutdown( fn poll_shutdown(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> { ) -> Poll<Result<(), std::io::Error>> {
self.project().0.poll_close(cx) self.project().inner.poll_close(cx)
} }
} }

@ -21,7 +21,7 @@ where
S: 'static + tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin, S: 'static + tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{ {
match mode { match mode {
Mode::Plain => Ok(TokioAdapter(socket)), Mode::Plain => Ok(TokioAdapter::new(socket)),
Mode::Tls => Err(Error::Url("TLS support not compiled in.".into())), Mode::Tls => Err(Error::Url("TLS support not compiled in.".into())),
} }
} }

@ -28,7 +28,7 @@ where
S: 'static + tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin, S: 'static + tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{ {
match mode { match mode {
Mode::Plain => Ok(StreamSwitcher::Plain(TokioAdapter(socket))), Mode::Plain => Ok(StreamSwitcher::Plain(TokioAdapter::new(socket))),
Mode::Tls => { Mode::Tls => {
let stream = { let stream = {
let connector = if let Some(connector) = connector { let connector = if let Some(connector) = connector {
@ -42,7 +42,7 @@ where
.await .await
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))? .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?
}; };
Ok(StreamSwitcher::Tls(TokioAdapter(stream))) Ok(StreamSwitcher::Tls(TokioAdapter::new(stream)))
} }
} }
} }

@ -34,7 +34,7 @@ where
+ Sync, + Sync,
{ {
match mode { match mode {
Mode::Plain => Ok(StreamSwitcher::Plain(TokioAdapter(socket))), Mode::Plain => Ok(StreamSwitcher::Plain(TokioAdapter::new(socket))),
Mode::Tls => { Mode::Tls => {
let stream = { let stream = {
let connector = if let Some(connector) = connector { let connector = if let Some(connector) = connector {
@ -54,7 +54,7 @@ where
TlsStream::new(ssl, socket) TlsStream::new(ssl, socket)
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))? .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?
}; };
Ok(StreamSwitcher::Tls(TokioAdapter(stream))) Ok(StreamSwitcher::Tls(TokioAdapter::new(stream)))
} }
} }
} }

@ -29,7 +29,7 @@ where
S: 'static + tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin, S: 'static + tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{ {
match mode { match mode {
Mode::Plain => Ok(StreamSwitcher::Plain(TokioAdapter(socket))), Mode::Plain => Ok(StreamSwitcher::Plain(TokioAdapter::new(socket))),
Mode::Tls => { Mode::Tls => {
let stream = { let stream = {
let connector = if let Some(connector) = connector { let connector = if let Some(connector) = connector {
@ -48,7 +48,7 @@ where
.await .await
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))? .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?
}; };
Ok(StreamSwitcher::Tls(TokioAdapter(stream))) Ok(StreamSwitcher::Tls(TokioAdapter::new(stream)))
} }
} }
} }

Loading…
Cancel
Save