You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							203 lines
						
					
					
						
							6.5 KiB
						
					
					
				
			
		
		
	
	
							203 lines
						
					
					
						
							6.5 KiB
						
					
					
				| //! `gio` integration.
 | |
| use ng_tungstenite::Error;
 | |
| 
 | |
| use std::io;
 | |
| 
 | |
| use gio::prelude::*;
 | |
| 
 | |
| use futures_io::{AsyncRead, AsyncWrite};
 | |
| 
 | |
| use ng_tungstenite::client::{uri_mode, IntoClientRequest};
 | |
| use ng_tungstenite::handshake::client::Request;
 | |
| use ng_tungstenite::handshake::server::{Callback, NoCallback};
 | |
| use ng_tungstenite::stream::Mode;
 | |
| 
 | |
| use crate::{client_async_with_config, domain, port, Response, WebSocketConfig, WebSocketStream};
 | |
| 
 | |
| /// Type alias for the stream type of the `connect_async()` functions.
 | |
| pub type ConnectStream = IOStreamAsyncReadWrite<gio::SocketConnection>;
 | |
| 
 | |
| /// Connect to a given URL.
 | |
| pub async fn connect_async<R>(
 | |
|     request: R,
 | |
| ) -> Result<(WebSocketStream<ConnectStream>, Response), Error>
 | |
| where
 | |
|     R: IntoClientRequest + Unpin,
 | |
| {
 | |
|     connect_async_with_config(request, None).await
 | |
| }
 | |
| 
 | |
| /// Connect to a given URL with a given WebSocket configuration.
 | |
| pub async fn connect_async_with_config<R>(
 | |
|     request: R,
 | |
|     config: Option<WebSocketConfig>,
 | |
| ) -> Result<(WebSocketStream<ConnectStream>, Response), Error>
 | |
| where
 | |
|     R: IntoClientRequest + Unpin,
 | |
| {
 | |
|     let request: Request = request.into_client_request()?;
 | |
| 
 | |
|     let domain = domain(&request)?;
 | |
|     let port = port(&request)?;
 | |
| 
 | |
|     let client = gio::SocketClient::new();
 | |
| 
 | |
|     // Make sure we check domain and mode first. URL must be valid.
 | |
|     let mode = uri_mode(request.uri())?;
 | |
|     if let Mode::Tls = mode {
 | |
|         client.set_tls(true);
 | |
|     } else {
 | |
|         client.set_tls(false);
 | |
|     }
 | |
| 
 | |
|     let connectable = gio::NetworkAddress::new(domain.as_str(), port);
 | |
| 
 | |
|     let socket = client
 | |
|         .connect_future(&connectable)
 | |
|         .await
 | |
|         .map_err(to_std_io_error)?;
 | |
|     let socket = IOStreamAsyncReadWrite::new(socket)
 | |
|         .map_err(|_| io::Error::new(io::ErrorKind::Other, "Unsupported gio::IOStream"))?;
 | |
| 
 | |
|     client_async_with_config(request, socket, config).await
 | |
| }
 | |
| 
 | |
| /// Accepts a new WebSocket connection with the provided stream.
 | |
| ///
 | |
| /// This function will internally call `server::accept` to create a
 | |
| /// handshake representation and returns a future representing the
 | |
| /// resolution of the WebSocket handshake. The returned future will resolve
 | |
| /// to either `WebSocketStream<S>` or `Error` depending if it's successful
 | |
| /// or not.
 | |
| ///
 | |
| /// This is typically used after a socket has been accepted from a
 | |
| /// `TcpListener`. That socket is then passed to this function to perform
 | |
| /// the server half of the accepting a client's websocket connection.
 | |
| pub async fn accept_async<S>(stream: S) -> Result<WebSocketStream<IOStreamAsyncReadWrite<S>>, Error>
 | |
| where
 | |
|     S: IsA<gio::IOStream> + Unpin,
 | |
| {
 | |
|     accept_hdr_async(stream, NoCallback).await
 | |
| }
 | |
| 
 | |
| /// The same as `accept_async()` but the one can specify a websocket configuration.
 | |
| /// Please refer to `accept_async()` for more details.
 | |
| pub async fn accept_async_with_config<S>(
 | |
|     stream: S,
 | |
|     config: Option<WebSocketConfig>,
 | |
| ) -> Result<WebSocketStream<IOStreamAsyncReadWrite<S>>, Error>
 | |
| where
 | |
|     S: IsA<gio::IOStream> + Unpin,
 | |
| {
 | |
|     accept_hdr_async_with_config(stream, NoCallback, config).await
 | |
| }
 | |
| 
 | |
| /// Accepts a new WebSocket connection with the provided stream.
 | |
| ///
 | |
| /// This function does the same as `accept_async()` but accepts an extra callback
 | |
| /// for header processing. The callback receives headers of the incoming
 | |
| /// requests and is able to add extra headers to the reply.
 | |
| pub async fn accept_hdr_async<S, C>(
 | |
|     stream: S,
 | |
|     callback: C,
 | |
| ) -> Result<WebSocketStream<IOStreamAsyncReadWrite<S>>, Error>
 | |
| where
 | |
|     S: IsA<gio::IOStream> + Unpin,
 | |
|     C: Callback + Unpin,
 | |
| {
 | |
|     accept_hdr_async_with_config(stream, callback, None).await
 | |
| }
 | |
| 
 | |
| /// The same as `accept_hdr_async()` but the one can specify a websocket configuration.
 | |
| /// Please refer to `accept_hdr_async()` for more details.
 | |
| pub async fn accept_hdr_async_with_config<S, C>(
 | |
|     stream: S,
 | |
|     callback: C,
 | |
|     config: Option<WebSocketConfig>,
 | |
| ) -> Result<WebSocketStream<IOStreamAsyncReadWrite<S>>, Error>
 | |
| where
 | |
|     S: IsA<gio::IOStream> + Unpin,
 | |
|     C: Callback + Unpin,
 | |
| {
 | |
|     let stream = IOStreamAsyncReadWrite::new(stream)
 | |
|         .map_err(|_| io::Error::new(io::ErrorKind::Other, "Unsupported gio::IOStream"))?;
 | |
| 
 | |
|     crate::accept_hdr_async_with_config(stream, callback, config).await
 | |
| }
 | |
| 
 | |
| /// Adapter for `gio::IOStream` to provide `AsyncRead` and `AsyncWrite`.
 | |
| #[derive(Debug)]
 | |
| pub struct IOStreamAsyncReadWrite<T: IsA<gio::IOStream>> {
 | |
|     #[allow(dead_code)]
 | |
|     io_stream: T,
 | |
|     read: gio::InputStreamAsyncRead<gio::PollableInputStream>,
 | |
|     write: gio::OutputStreamAsyncWrite<gio::PollableOutputStream>,
 | |
| }
 | |
| 
 | |
| unsafe impl<T: IsA<gio::IOStream>> Send for IOStreamAsyncReadWrite<T> {}
 | |
| 
 | |
| impl<T: IsA<gio::IOStream>> IOStreamAsyncReadWrite<T> {
 | |
|     /// Create a new `gio::IOStream` adapter
 | |
|     fn new(stream: T) -> Result<IOStreamAsyncReadWrite<T>, T> {
 | |
|         let write = stream
 | |
|             .output_stream()
 | |
|             .dynamic_cast::<gio::PollableOutputStream>()
 | |
|             .ok()
 | |
|             .and_then(|s| s.into_async_write().ok());
 | |
| 
 | |
|         let read = stream
 | |
|             .input_stream()
 | |
|             .dynamic_cast::<gio::PollableInputStream>()
 | |
|             .ok()
 | |
|             .and_then(|s| s.into_async_read().ok());
 | |
| 
 | |
|         let (read, write) = match (read, write) {
 | |
|             (Some(read), Some(write)) => (read, write),
 | |
|             _ => return Err(stream),
 | |
|         };
 | |
| 
 | |
|         Ok(IOStreamAsyncReadWrite {
 | |
|             io_stream: stream,
 | |
|             read,
 | |
|             write,
 | |
|         })
 | |
|     }
 | |
| }
 | |
| 
 | |
| use std::pin::Pin;
 | |
| use std::task::{Context, Poll};
 | |
| 
 | |
| impl<T: IsA<gio::IOStream> + Unpin> AsyncRead for IOStreamAsyncReadWrite<T> {
 | |
|     fn poll_read(
 | |
|         self: Pin<&mut Self>,
 | |
|         cx: &mut Context<'_>,
 | |
|         buf: &mut [u8],
 | |
|     ) -> Poll<Result<usize, io::Error>> {
 | |
|         Pin::new(&mut Pin::get_mut(self).read).poll_read(cx, buf)
 | |
|     }
 | |
| }
 | |
| 
 | |
| impl<T: IsA<gio::IOStream> + Unpin> AsyncWrite for IOStreamAsyncReadWrite<T> {
 | |
|     fn poll_write(
 | |
|         self: Pin<&mut Self>,
 | |
|         cx: &mut Context<'_>,
 | |
|         buf: &[u8],
 | |
|     ) -> Poll<Result<usize, io::Error>> {
 | |
|         Pin::new(&mut Pin::get_mut(self).write).poll_write(cx, buf)
 | |
|     }
 | |
| 
 | |
|     fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
 | |
|         Pin::new(&mut Pin::get_mut(self).write).poll_close(cx)
 | |
|     }
 | |
| 
 | |
|     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
 | |
|         Pin::new(&mut Pin::get_mut(self).write).poll_flush(cx)
 | |
|     }
 | |
| }
 | |
| 
 | |
| fn to_std_io_error(error: glib::Error) -> io::Error {
 | |
|     match error.kind::<gio::IOErrorEnum>() {
 | |
|         Some(io_error_enum) => io::Error::new(io_error_enum.into(), error),
 | |
|         None => io::Error::new(io::ErrorKind::Other, error),
 | |
|     }
 | |
| }
 | |
| 
 |