Instead simply send an owned Closed message. This simplifies the code
and among other things also handles errors like WouldBlock correctly
instead of handling them like a real error.
The same functionality is already provided via StreamExt::send() and
unlike the custom implementation it handles WouldBlock correctly and not
as an error.
As a side effect also gets rid of unsafe code and raw pointers.
We have the problem that external read operations (i.e. the Stream impl)
can trigger both read (AsyncRead) and write (AsyncWrite) operations on
the underyling stream. At the same time write operations (i.e. the Sink
impl) can trigger write operations (AsyncWrite) too.
Both the Stream and the Sink can be used on two different tasks, but it
is required that AsyncRead and AsyncWrite are only ever used by a single
task (or better: with a single waker) at a time.
Doing otherwise would cause only the latest waker to be remembered, so
in our case either the Stream or the Sink impl would potentially wait
forever to be woken up because only the other one would've been woken
up.
To solve this we implement a waker proxy that has two slots (one for
read, one for write) to store wakers. One waker proxy is always passed
to the AsyncRead, the other to AsyncWrite so that they will only ever
have to store a single waker, but internally we dispatch any wakeups to
up to two actual wakers (one from the Sink impl and one from the Stream
impl).
Connecting to a URL wraps the connection into this wrapper type based
on the protocol of the URL (`ws:` or `wss:`). Making this type public
allows client applications to specify the websocket's type without
importing the tokio_tls crate.