remove trace level logging

pull/211/head
qiujiangkun 3 years ago
parent cd79500d25
commit 37acfb3654
  1. 62
      src/handshake/client.rs
  2. 46
      src/handshake/machine.rs
  3. 32
      src/handshake/server.rs
  4. 58
      src/protocol/frame/frame.rs
  5. 36
      src/protocol/frame/mod.rs
  6. 47
      src/protocol/mod.rs

@ -61,11 +61,18 @@ impl<S: Read + Write> ClientHandshake<S> {
let client = {
let accept_key = derive_accept_key(key.as_ref());
ClientHandshake { verify_data: VerifyData { accept_key }, config, _marker: PhantomData }
ClientHandshake {
verify_data: VerifyData { accept_key },
config,
_marker: PhantomData,
}
};
trace!("Client handshake initiated.");
Ok(MidHandshake { role: client, machine })
// trace!("Client handshake initiated.");
Ok(MidHandshake {
role: client,
machine,
})
}
}
@ -81,7 +88,11 @@ impl<S: Read + Write> HandshakeRole for ClientHandshake<S> {
StageResult::DoneWriting(stream) => {
ProcessingResult::Continue(HandshakeMachine::start_read(stream))
}
StageResult::DoneReading { stream, result, tail } => {
StageResult::DoneReading {
stream,
result,
tail,
} => {
let result = self.verify_data.verify_response(result)?;
debug!("Client handshake done.");
let websocket =
@ -97,7 +108,10 @@ fn generate_request(request: Request, key: &str) -> Result<Vec<u8>> {
let mut req = Vec::new();
let uri = request.uri();
let authority = uri.authority().ok_or(Error::Url(UrlError::NoHostName))?.as_str();
let authority = uri
.authority()
.ok_or(Error::Url(UrlError::NoHostName))?
.as_str();
let host = if let Some(idx) = authority.find('@') {
// handle possible name:password@
authority.split_at(idx + 1).1
@ -119,7 +133,10 @@ fn generate_request(request: Request, key: &str) -> Result<Vec<u8>> {
Sec-WebSocket-Key: {key}\r\n",
version = request.version(),
host = host,
path = uri.path_and_query().ok_or(Error::Url(UrlError::NoPathOrQuery))?.as_str(),
path = uri
.path_and_query()
.ok_or(Error::Url(UrlError::NoPathOrQuery))?
.as_str(),
key = key
)
.unwrap();
@ -140,7 +157,7 @@ fn generate_request(request: Request, key: &str) -> Result<Vec<u8>> {
writeln!(req, "{}: {}\r", k, v.to_str()?).unwrap();
}
writeln!(req, "\r").unwrap();
trace!("Request: {:?}", String::from_utf8_lossy(&req));
// trace!("Request: {:?}", String::from_utf8_lossy(&req));
Ok(req)
}
@ -171,7 +188,9 @@ impl VerifyData {
.map(|h| h.eq_ignore_ascii_case("websocket"))
.unwrap_or(false)
{
return Err(Error::Protocol(ProtocolError::MissingUpgradeWebSocketHeader));
return Err(Error::Protocol(
ProtocolError::MissingUpgradeWebSocketHeader,
));
}
// 3. If the response lacks a |Connection| header field or the
// |Connection| header field doesn't contain a token that is an
@ -183,14 +202,22 @@ impl VerifyData {
.map(|h| h.eq_ignore_ascii_case("Upgrade"))
.unwrap_or(false)
{
return Err(Error::Protocol(ProtocolError::MissingConnectionUpgradeHeader));
return Err(Error::Protocol(
ProtocolError::MissingConnectionUpgradeHeader,
));
}
// 4. If the response lacks a |Sec-WebSocket-Accept| header field or
// the |Sec-WebSocket-Accept| contains a value other than the
// base64-encoded SHA-1 of ... the client MUST _Fail the WebSocket
// Connection_. (RFC 6455)
if !headers.get("Sec-WebSocket-Accept").map(|h| h == &self.accept_key).unwrap_or(false) {
return Err(Error::Protocol(ProtocolError::SecWebSocketAcceptKeyMismatch));
if !headers
.get("Sec-WebSocket-Accept")
.map(|h| h == &self.accept_key)
.unwrap_or(false)
{
return Err(Error::Protocol(
ProtocolError::SecWebSocketAcceptKeyMismatch,
));
}
// 5. If the response includes a |Sec-WebSocket-Extensions| header
// field and this header field indicates the use of an extension
@ -288,7 +315,9 @@ mod tests {
#[test]
fn request_formatting_with_host() {
let request = "wss://localhost:9001/getCaseCount".into_client_request().unwrap();
let request = "wss://localhost:9001/getCaseCount"
.into_client_request()
.unwrap();
let key = "A70tsIbeMZUbJHh5BWFw6Q==";
let correct = b"\
GET /getCaseCount HTTP/1.1\r\n\
@ -305,7 +334,9 @@ mod tests {
#[test]
fn request_formatting_with_at() {
let request = "wss://user:pass@localhost:9001/getCaseCount".into_client_request().unwrap();
let request = "wss://user:pass@localhost:9001/getCaseCount"
.into_client_request()
.unwrap();
let key = "A70tsIbeMZUbJHh5BWFw6Q==";
let correct = b"\
GET /getCaseCount HTTP/1.1\r\n\
@ -325,6 +356,9 @@ mod tests {
const DATA: &[u8] = b"HTTP/1.1 200 OK\r\nContent-Type: text/html\r\n\r\n";
let (_, resp) = Response::try_parse(DATA).unwrap().unwrap();
assert_eq!(resp.status(), http::StatusCode::OK);
assert_eq!(resp.headers().get("Content-Type").unwrap(), &b"text/html"[..],);
assert_eq!(
resp.headers().get("Content-Type").unwrap(),
&b"text/html"[..],
);
}
}

@ -18,11 +18,17 @@ pub struct HandshakeMachine<Stream> {
impl<Stream> HandshakeMachine<Stream> {
/// Start reading data from the peer.
pub fn start_read(stream: Stream) -> Self {
HandshakeMachine { stream, state: HandshakeState::Reading(ReadBuffer::new()) }
HandshakeMachine {
stream,
state: HandshakeState::Reading(ReadBuffer::new()),
}
}
/// Start writing data to the peer.
pub fn start_write<D: Into<Vec<u8>>>(stream: Stream, data: D) -> Self {
HandshakeMachine { stream, state: HandshakeState::Writing(Cursor::new(data.into())) }
HandshakeMachine {
stream,
state: HandshakeState::Writing(Cursor::new(data.into())),
}
}
/// Returns a shared reference to the inner stream.
pub fn get_ref(&self) -> &Stream {
@ -37,25 +43,27 @@ impl<Stream> HandshakeMachine<Stream> {
impl<Stream: Read + Write> HandshakeMachine<Stream> {
/// Perform a single handshake round.
pub fn single_round<Obj: TryParse>(mut self) -> Result<RoundResult<Obj, Stream>> {
trace!("Doing handshake round.");
// trace!("Doing handshake round.");
match self.state {
HandshakeState::Reading(mut buf) => {
let read = buf.read_from(&mut self.stream).no_block()?;
match read {
Some(0) => Err(Error::Protocol(ProtocolError::HandshakeIncomplete)),
Some(_) => Ok(if let Some((size, obj)) = Obj::try_parse(Buf::chunk(&buf))? {
buf.advance(size);
RoundResult::StageFinished(StageResult::DoneReading {
result: obj,
stream: self.stream,
tail: buf.into_vec(),
})
} else {
RoundResult::Incomplete(HandshakeMachine {
state: HandshakeState::Reading(buf),
..self
})
}),
Some(_) => Ok(
if let Some((size, obj)) = Obj::try_parse(Buf::chunk(&buf))? {
buf.advance(size);
RoundResult::StageFinished(StageResult::DoneReading {
result: obj,
stream: self.stream,
tail: buf.into_vec(),
})
} else {
RoundResult::Incomplete(HandshakeMachine {
state: HandshakeState::Reading(buf),
..self
})
},
),
None => Ok(RoundResult::WouldBlock(HandshakeMachine {
state: HandshakeState::Reading(buf),
..self
@ -101,7 +109,11 @@ pub enum RoundResult<Obj, Stream> {
#[derive(Debug)]
pub enum StageResult<Obj, Stream> {
/// Reading round finished.
DoneReading { result: Obj, stream: Stream, tail: Vec<u8> },
DoneReading {
result: Obj,
stream: Stream,
tail: Vec<u8>,
},
/// Writing round finished.
DoneWriting(Stream),
}

@ -45,10 +45,15 @@ fn create_parts<T>(request: &HttpRequest<T>) -> Result<Builder> {
.headers()
.get("Connection")
.and_then(|h| h.to_str().ok())
.map(|h| h.split(|c| c == ' ' || c == ',').any(|p| p.eq_ignore_ascii_case("Upgrade")))
.map(|h| {
h.split(|c| c == ' ' || c == ',')
.any(|p| p.eq_ignore_ascii_case("Upgrade"))
})
.unwrap_or(false)
{
return Err(Error::Protocol(ProtocolError::MissingConnectionUpgradeHeader));
return Err(Error::Protocol(
ProtocolError::MissingConnectionUpgradeHeader,
));
}
if !request
@ -58,11 +63,20 @@ fn create_parts<T>(request: &HttpRequest<T>) -> Result<Builder> {
.map(|h| h.eq_ignore_ascii_case("websocket"))
.unwrap_or(false)
{
return Err(Error::Protocol(ProtocolError::MissingUpgradeWebSocketHeader));
return Err(Error::Protocol(
ProtocolError::MissingUpgradeWebSocketHeader,
));
}
if !request.headers().get("Sec-WebSocket-Version").map(|h| h == "13").unwrap_or(false) {
return Err(Error::Protocol(ProtocolError::MissingSecWebSocketVersionHeader));
if !request
.headers()
.get("Sec-WebSocket-Version")
.map(|h| h == "13")
.unwrap_or(false)
{
return Err(Error::Protocol(
ProtocolError::MissingSecWebSocketVersionHeader,
));
}
let key = request
@ -212,7 +226,7 @@ impl<S: Read + Write, C: Callback> ServerHandshake<S, C> {
/// server, you can specify the callback if you want to add additional header to the client
/// upon join based on the incoming headers.
pub fn start(stream: S, callback: C, config: Option<WebSocketConfig>) -> MidHandshake<Self> {
trace!("Server handshake initiated.");
// trace!("Server handshake initiated.");
MidHandshake {
machine: HandshakeMachine::start_read(stream),
role: ServerHandshake {
@ -235,7 +249,11 @@ impl<S: Read + Write, C: Callback> HandshakeRole for ServerHandshake<S, C> {
finish: StageResult<Self::IncomingData, Self::InternalStream>,
) -> Result<ProcessingResult<Self::InternalStream, Self::FinalResult>> {
Ok(match finish {
StageResult::DoneReading { stream, result, tail } => {
StageResult::DoneReading {
stream,
result,
tail,
} => {
if !tail.is_empty() {
return Err(Error::Protocol(ProtocolError::JunkAfterRequest));
}

@ -28,7 +28,10 @@ pub struct CloseFrame<'t> {
impl<'t> CloseFrame<'t> {
/// Convert into a owned string.
pub fn into_owned(self) -> CloseFrame<'static> {
CloseFrame { code: self.code, reason: self.reason.into_owned().into() }
CloseFrame {
code: self.code,
reason: self.reason.into_owned().into(),
}
}
}
@ -137,12 +140,12 @@ impl FrameHeader {
if cursor.read(&mut head)? != 2 {
return Ok(None);
}
trace!("Parsed headers {:?}", head);
// trace!("Parsed headers {:?}", head);
(head[0], head[1])
};
trace!("First: {:b}", first);
trace!("Second: {:b}", second);
// trace!("First: {:b}", first);
// trace!("Second: {:b}", second);
let is_final = first & 0x80 != 0;
@ -151,10 +154,10 @@ impl FrameHeader {
let rsv3 = first & 0x10 != 0;
let opcode = OpCode::from(first & 0x0F);
trace!("Opcode: {:?}", opcode);
// trace!("Opcode: {:?}", opcode);
let masked = second & 0x80 != 0;
trace!("Masked: {:?}", masked);
// trace!("Masked: {:?}", masked);
let length = {
let length_byte = second & 0x7F;
@ -193,7 +196,14 @@ impl FrameHeader {
_ => (),
}
let hdr = FrameHeader { is_final, rsv1, rsv2, rsv3, opcode, mask };
let hdr = FrameHeader {
is_final,
rsv1,
rsv2,
rsv3,
opcode,
mask,
};
Ok(Some((hdr, length)))
}
@ -298,7 +308,10 @@ impl Frame {
let code = NetworkEndian::read_u16(&data[0..2]).into();
data.drain(0..2);
let text = String::from_utf8(data)?;
Ok(Some(CloseFrame { code, reason: text.into() }))
Ok(Some(CloseFrame {
code,
reason: text.into(),
}))
}
}
}
@ -306,9 +319,19 @@ impl Frame {
/// Create a new data frame.
#[inline]
pub fn message(data: Vec<u8>, opcode: OpCode, is_final: bool) -> Frame {
debug_assert!(matches!(opcode, OpCode::Data(_)), "Invalid opcode for data frame.");
debug_assert!(
matches!(opcode, OpCode::Data(_)),
"Invalid opcode for data frame."
);
Frame { header: FrameHeader { is_final, opcode, ..FrameHeader::default() }, payload: data }
Frame {
header: FrameHeader {
is_final,
opcode,
..FrameHeader::default()
},
payload: data,
}
}
/// Create a new Pong control frame.
@ -347,7 +370,10 @@ impl Frame {
Vec::new()
};
Frame { header: FrameHeader::default(), payload }
Frame {
header: FrameHeader::default(),
payload,
}
}
/// Create a frame from given header and data.
@ -385,7 +411,10 @@ payload: 0x{}
// self.mask.map(|mask| format!("{:?}", mask)).unwrap_or("NONE".into()),
self.len(),
self.payload.len(),
self.payload.iter().map(|byte| format!("{:x}", byte)).collect::<String>()
self.payload
.iter()
.map(|byte| format!("{:x}", byte))
.collect::<String>()
)
}
}
@ -457,7 +486,10 @@ mod tests {
let mut payload = Vec::new();
raw.read_to_end(&mut payload).unwrap();
let frame = Frame::from_payload(header, payload);
assert_eq!(frame.into_data(), vec![0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07]);
assert_eq!(
frame.into_data(),
vec![0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07]
);
}
#[test]

@ -28,12 +28,18 @@ pub struct FrameSocket<Stream> {
impl<Stream> FrameSocket<Stream> {
/// Create a new frame socket.
pub fn new(stream: Stream) -> Self {
FrameSocket { stream, codec: FrameCodec::new() }
FrameSocket {
stream,
codec: FrameCodec::new(),
}
}
/// Create a new frame socket from partially read data.
pub fn from_partially_read(stream: Stream, part: Vec<u8>) -> Self {
FrameSocket { stream, codec: FrameCodec::from_partially_read(part) }
FrameSocket {
stream,
codec: FrameCodec::from_partially_read(part),
}
}
/// Extract a stream from the socket.
@ -95,7 +101,11 @@ pub(super) struct FrameCodec {
impl FrameCodec {
/// Create a new frame codec.
pub(super) fn new() -> Self {
Self { in_buffer: ReadBuffer::new(), out_buffer: Vec::new(), header: None }
Self {
in_buffer: ReadBuffer::new(),
out_buffer: Vec::new(),
header: None,
}
}
/// Create a new frame codec from partially read data.
@ -153,7 +163,7 @@ impl FrameCodec {
// Not enough data in buffer.
let size = self.in_buffer.read_from(stream)?;
if size == 0 {
trace!("no frame received");
// trace!("no frame received");
return Ok(None);
}
};
@ -161,7 +171,7 @@ impl FrameCodec {
let (header, length) = self.header.take().expect("Bug: no frame header");
debug_assert_eq!(payload.len() as u64, length);
let frame = Frame::from_payload(header, payload);
trace!("received frame {}", frame);
// trace!("received frame {}", frame);
Ok(Some(frame))
}
@ -170,9 +180,11 @@ impl FrameCodec {
where
Stream: Write,
{
trace!("writing frame {}", frame);
// trace!("writing frame {}", frame);
self.out_buffer.reserve(frame.len());
frame.format(&mut self.out_buffer).expect("Bug: can't write to vector");
frame
.format(&mut self.out_buffer)
.expect("Bug: can't write to vector");
self.write_pending(stream)
}
@ -219,7 +231,10 @@ mod tests {
sock.read_frame(None).unwrap().unwrap().into_data(),
vec![0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07]
);
assert_eq!(sock.read_frame(None).unwrap().unwrap().into_data(), vec![0x03, 0x02, 0x01]);
assert_eq!(
sock.read_frame(None).unwrap().unwrap().into_data(),
vec![0x03, 0x02, 0x01]
);
assert!(sock.read_frame(None).unwrap().is_none());
let (_, rest) = sock.into_inner();
@ -265,7 +280,10 @@ mod tests {
let mut sock = FrameSocket::new(raw);
assert!(matches!(
sock.read_frame(Some(5)),
Err(Error::Capacity(CapacityError::MessageTooLong { size: 7, max_size: 5 }))
Err(Error::Capacity(CapacityError::MessageTooLong {
size: 7,
max_size: 5
}))
));
}
}

@ -88,7 +88,10 @@ impl<Stream> WebSocket<Stream> {
/// or together with an existing one. If you need an initial handshake, use
/// `connect()` or `accept()` functions of the crate to construct a websocket.
pub fn from_raw_socket(stream: Stream, role: Role, config: Option<WebSocketConfig>) -> Self {
WebSocket { socket: stream, context: WebSocketContext::new(role, config) }
WebSocket {
socket: stream,
context: WebSocketContext::new(role, config),
}
}
/// Convert a raw socket into a WebSocket without performing a handshake.
@ -308,7 +311,7 @@ impl WebSocketContext {
// If we get here, either write blocks or we have nothing to write.
// Thus if read blocks, just let it return WouldBlock.
if let Some(message) = self.read_message_frame(stream)? {
trace!("Received message {}", message);
// trace!("Received message {}", message);
return Ok(message);
}
}
@ -375,11 +378,11 @@ impl WebSocketContext {
// response, unless it already received a Close frame. It SHOULD
// respond with Pong frame as soon as is practical. (RFC 6455)
if let Some(pong) = self.pong.take() {
trace!("Sending pong reply");
// trace!("Sending pong reply");
self.send_one_frame(stream, pong)?;
}
// If we have any unsent frames, send them.
trace!("Frames still in queue: {}", self.send_queue.len());
// trace!("Frames still in queue: {}", self.send_queue.len());
while let Some(data) = self.send_queue.pop_front() {
self.send_one_frame(stream, data)?;
}
@ -602,8 +605,10 @@ impl WebSocketContext {
}
}
trace!("Sending frame: {:?}", frame);
self.frame.write_frame(stream, frame).check_connection_reset(self.state)
// trace!("Sending frame: {:?}", frame);
self.frame
.write_frame(stream, frame)
.check_connection_reset(self.state)
}
}
@ -698,8 +703,14 @@ mod tests {
let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, None);
assert_eq!(socket.read_message().unwrap(), Message::Ping(vec![1, 2]));
assert_eq!(socket.read_message().unwrap(), Message::Pong(vec![3]));
assert_eq!(socket.read_message().unwrap(), Message::Text("Hello, World!".into()));
assert_eq!(socket.read_message().unwrap(), Message::Binary(vec![0x01, 0x02, 0x03]));
assert_eq!(
socket.read_message().unwrap(),
Message::Text("Hello, World!".into())
);
assert_eq!(
socket.read_message().unwrap(),
Message::Binary(vec![0x01, 0x02, 0x03])
);
}
#[test]
@ -708,24 +719,36 @@ mod tests {
0x01, 0x07, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x2c, 0x20, 0x80, 0x06, 0x57, 0x6f, 0x72,
0x6c, 0x64, 0x21,
]);
let limit = WebSocketConfig { max_message_size: Some(10), ..WebSocketConfig::default() };
let limit = WebSocketConfig {
max_message_size: Some(10),
..WebSocketConfig::default()
};
let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, Some(limit));
assert!(matches!(
socket.read_message(),
Err(Error::Capacity(CapacityError::MessageTooLong { size: 13, max_size: 10 }))
Err(Error::Capacity(CapacityError::MessageTooLong {
size: 13,
max_size: 10
}))
));
}
#[test]
fn size_limiting_binary() {
let incoming = Cursor::new(vec![0x82, 0x03, 0x01, 0x02, 0x03]);
let limit = WebSocketConfig { max_message_size: Some(2), ..WebSocketConfig::default() };
let limit = WebSocketConfig {
max_message_size: Some(2),
..WebSocketConfig::default()
};
let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, Some(limit));
assert!(matches!(
socket.read_message(),
Err(Error::Capacity(CapacityError::MessageTooLong { size: 3, max_size: 2 }))
Err(Error::Capacity(CapacityError::MessageTooLong {
size: 3,
max_size: 2
}))
));
}
}

Loading…
Cancel
Save