From d722edd4af55a33201a0fdb3969780694973cbcc Mon Sep 17 00:00:00 2001 From: Tpt Date: Sun, 19 Sep 2021 22:26:48 +0200 Subject: [PATCH] Use OxHTTP HTTP client --- lib/Cargo.toml | 6 +- lib/src/sparql/http/dummy.rs | 20 +- lib/src/sparql/http/simple.rs | 458 ++++------------------------------ lib/src/sparql/service.rs | 44 +--- lib/src/sparql/update.rs | 38 +-- 5 files changed, 86 insertions(+), 480 deletions(-) diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 7a3f55fd..178fa904 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -19,7 +19,7 @@ all-features = true [features] default = [] sophia = ["sophia_api"] -http_client = ["httparse", "native-tls"] +http_client = ["oxhttp", "oxhttp/native-tls"] [dependencies] quick-xml = "0.22" @@ -39,14 +39,12 @@ nom = "7" siphasher = "0.3" lasso = "0.5" sophia_api = { version = "0.7", optional = true } -http = "0.2" -httparse = { version = "1", optional = true } -native-tls = { version = "0.2", optional = true } json-event-parser = "0.1" spargebra = { version = "0.1", path="../spargebra", features = ["rdf-star"] } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] sled = "0.34" +oxhttp = { git = "https://github.com/oxigraph/oxhttp", branch = "master", optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] js-sys = "0.3" diff --git a/lib/src/sparql/http/dummy.rs b/lib/src/sparql/http/dummy.rs index 0116c2f2..37497456 100644 --- a/lib/src/sparql/http/dummy.rs +++ b/lib/src/sparql/http/dummy.rs @@ -1,9 +1,8 @@ //! Simple HTTP client use crate::error::invalid_input_error; -use http::{Request, Response}; use std::io; -use std::io::BufRead; +use std::io::{BufRead, Empty, Read, Result}; pub struct Client {} @@ -12,12 +11,21 @@ impl Client { Self {} } - pub fn request( + pub fn get(&self, _url: &str, _accept: &str) -> Result<(String, Empty)> { + Err(invalid_input_error( + "HTTP client is not available. Enable the feature 'http_client'", + )) + } + + pub fn post( &self, - _request: &Request>>, - ) -> io::Result>> { + _url: &str, + _payload: Vec, + _content_type: &str, + _accept: &str, + ) -> Result<(String, Empty)> { Err(invalid_input_error( - "HTTP client is not available. Enable the feature 'simple_http'", + "HTTP client is not available. Enable the feature 'http_client'", )) } } diff --git a/lib/src/sparql/http/simple.rs b/lib/src/sparql/http/simple.rs index d8a17857..aa278d05 100644 --- a/lib/src/sparql/http/simple.rs +++ b/lib/src/sparql/http/simple.rs @@ -1,420 +1,70 @@ -//! Simple HTTP client - use crate::error::{invalid_data_error, invalid_input_error}; -use http::header::{CONNECTION, CONTENT_LENGTH, HOST, TRANSFER_ENCODING}; -use http::{Request, Response, Version}; -use httparse::Status; -use native_tls::TlsConnector; -use std::cmp::min; -use std::convert::TryInto; -use std::io; -use std::io::{BufRead, BufReader, Read, Write}; -use std::net::TcpStream; +use oxhttp::model::{Body, HeaderName, Method, Request}; +use std::io::{Read, Result}; -pub struct Client {} +#[derive(Default)] +pub struct Client { + client: oxhttp::Client, +} impl Client { pub fn new() -> Self { - Self {} - } - - #[allow(clippy::unused_self)] - pub fn request( - &self, - request: &Request>>, - ) -> io::Result>> { - let scheme = request - .uri() - .scheme_str() - .ok_or_else(|| invalid_input_error("No host provided"))?; - let port = if let Some(port) = request.uri().port_u16() { - port - } else { - match scheme { - "http" => 80, - "https" => 443, - _ => { - return Err(invalid_input_error(format!( - "No port provided for scheme '{}'", - scheme - ))) - } - } - }; - let host = request - .uri() - .host() - .ok_or_else(|| invalid_input_error("No host provided"))?; - - match scheme { - "http" => { - let mut stream = TcpStream::connect((host, port))?; - Self::encode(request, &mut stream)?; - Self::decode(stream) - } - "https" => { - let connector = - TlsConnector::new().map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - let stream = TcpStream::connect((host, port))?; - let mut stream = connector - .connect(host, stream) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - Self::encode(request, &mut stream)?; - Self::decode(stream) - } - _ => Err(invalid_input_error(format!( - "Not supported URL scheme: {}", - scheme - ))), - } - } - - fn encode(request: &Request>>, mut writer: &mut impl Write) -> io::Result<()> { - if request.headers().contains_key(CONTENT_LENGTH) { - return Err(invalid_input_error( - "content-length header is set by the client library", - )); - } - if request.headers().contains_key(HOST) { - return Err(invalid_input_error( - "host header is set by the client library", - )); - } - if request.headers().contains_key(CONNECTION) { - return Err(invalid_input_error( - "connection header is set by the client library", - )); - } - if let Some(query) = request.uri().query() { - write!( - &mut writer, - "{} {}?{} HTTP/1.1\r\n", - request.method(), - request.uri().path(), - query - )?; - } else { - write!( - &mut writer, - "{} {} HTTP/1.1\r\n", - request.method(), - request.uri().path() - )?; - } - - // host - let host = request - .uri() - .host() - .ok_or_else(|| invalid_input_error("No host provided"))?; - if let Some(port) = request.uri().port() { - write!(writer, "host: {}:{}\r\n", request.uri(), port) - } else { - write!(writer, "host: {}\r\n", host) - }?; - - // connection - write!(writer, "connection: close\r\n")?; - - // headers - for (name, value) in request.headers() { - write!(writer, "{}: ", name.as_str())?; - writer.write_all(value.as_bytes())?; - write!(writer, "\r\n")?; - } - - // body with content-length - if let Some(payload) = request.body() { - write!(writer, "content-length: {}\r\n\r\n", payload.len())?; - writer.write_all(payload)?; - } else { - write!(writer, "\r\n")?; - } - Ok(()) + Self::default() } - fn decode<'a>(reader: impl Read + 'a) -> io::Result>> { - let mut reader = BufReader::new(reader); - - // Let's read the headers - let mut buffer = Vec::new(); - let mut headers = [httparse::EMPTY_HEADER; 1024]; - let mut parsed_response = httparse::Response::new(&mut headers); - loop { - if reader.read_until(b'\n', &mut buffer)? == 0 { - return Err(invalid_data_error("Empty HTTP response")); - } - if buffer.len() > 8 * 1024 { - return Err(invalid_data_error("The headers size should fit in 8kb")); - } - - if buffer.ends_with(b"\r\n\r\n") || buffer.ends_with(b"\n\n") { - break; //end of buffer - } - } - if parsed_response - .parse(&buffer) - .map_err(invalid_data_error)? - .is_partial() - { - return Err(invalid_input_error( - "Partial HTTP headers containing two line jumps", - )); - } - - // Let's build the response - let mut response = Response::builder() - .status( - parsed_response - .code - .ok_or_else(|| invalid_data_error("No status code in the HTTP response"))?, - ) - .version(match parsed_response.version { - Some(0) => Version::HTTP_10, - Some(1) => Version::HTTP_11, - Some(id) => { - return Err(invalid_data_error(format!( - "Unsupported HTTP response version: 1.{}", - id - ))) - } - None => return Err(invalid_data_error("No HTTP version in the HTTP response")), - }); - for header in parsed_response.headers { - response = response.header(header.name, header.value); - } - - let content_length = response.headers_ref().and_then(|h| h.get(CONTENT_LENGTH)); - let transfer_encoding = response - .headers_ref() - .and_then(|h| h.get(TRANSFER_ENCODING)); - if transfer_encoding.is_some() && content_length.is_some() { - return Err(invalid_data_error( - "Transfer-Encoding and Content-Length should not be set at the same time", - )); - } - - let body: Box = if let Some(content_length) = content_length { - let len = content_length - .to_str() - .map_err(invalid_data_error)? - .parse::() - .map_err(invalid_data_error)?; - Box::new(reader.take(len)) - } else if let Some(transfer_encoding) = transfer_encoding { - let transfer_encoding = transfer_encoding.to_str().map_err(invalid_data_error)?; - if transfer_encoding.eq_ignore_ascii_case("chunked") { - buffer.clear(); - Box::new(BufReader::new(ChunkedResponse { - reader, - buffer, - is_start: true, - chunk_position: 1, - chunk_size: 1, - })) - } else { - return Err(invalid_data_error(format!( - "Transfer-Encoding: {} is not supported", - transfer_encoding - ))); - } - } else { - Box::new(io::empty()) - }; - - response.body(body).map_err(invalid_data_error) - } -} - -struct ChunkedResponse { - reader: R, - buffer: Vec, - is_start: bool, - chunk_position: usize, - chunk_size: usize, -} - -impl Read for ChunkedResponse { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - loop { - // In case we still have data - if self.chunk_position < self.chunk_size { - let inner_buf = self.reader.fill_buf()?; - let size = min( - min(buf.len(), inner_buf.len()), - self.chunk_size - self.chunk_position, - ); - buf[..size].copy_from_slice(&inner_buf[..size]); - self.reader.consume(size); - self.chunk_position += size; - return Ok(size); // Won't be 0 if there is still some inner buffer - } - - if self.chunk_size == 0 { - return Ok(0); // We know it's the end - } - - if self.is_start { - self.is_start = false; - } else { - // chunk end - self.buffer.clear(); - self.reader.read_until(b'\n', &mut self.buffer)?; - if self.buffer != b"\r\n" && self.buffer != b"\n" { - return Err(invalid_data_error("Invalid chunked element end")); - } - } - - // We load a new chunk - self.buffer.clear(); - self.reader.read_until(b'\n', &mut self.buffer)?; - self.chunk_position = 0; - self.chunk_size = if let Ok(Status::Complete((read, chunk_size))) = - httparse::parse_chunk_size(&self.buffer) - { - if read != self.buffer.len() { - return Err(invalid_data_error("Chuncked header containing a line jump")); - } - chunk_size.try_into().map_err(invalid_data_error)? - } else { - return Err(invalid_data_error("Invalid chuncked header")); - }; - - if self.chunk_size == 0 { - // we read the trailers - loop { - if self.reader.read_until(b'\n', &mut self.buffer)? == 0 { - return Err(invalid_data_error("Missing chunked encoding end")); - } - if self.buffer.len() > 8 * 1024 { - return Err(invalid_data_error("The trailers size should fit in 8kb")); - } - if self.buffer.ends_with(b"\r\n\r\n") || self.buffer.ends_with(b"\n\n") { - break; //end of buffer - } - } - return Ok(0); - } - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use http::header::{ACCEPT, CONTENT_TYPE}; - use http::{Method, StatusCode}; - use std::io::Cursor; - use std::str; - - #[test] - fn encode_get_request() -> io::Result<()> { - let mut buffer = Vec::new(); - Client::encode( - &Request::builder() - .method(Method::GET) - .uri("http://example.com/foo/bar?query#fragment") - .header(ACCEPT, "application/json") - .body(None) - .unwrap(), - &mut buffer, - )?; - assert_eq!( - str::from_utf8(&buffer).unwrap(), - "GET /foo/bar?query HTTP/1.1\r\nhost: example.com\r\nconnection: close\r\naccept: application/json\r\n\r\n" + pub fn get(&self, url: &str, accept: &str) -> Result<(String, Body)> { + let mut request = Request::new(Method::GET, url.parse().map_err(invalid_input_error)?); + request.headers_mut().append( + HeaderName::ACCEPT, + accept.parse().map_err(invalid_input_error)?, ); - Ok(()) - } - - #[test] - fn encode_post_request() -> io::Result<()> { - let mut buffer = Vec::new(); - Client::encode( - &Request::builder() - .method(Method::POST) - .uri("http://example.com/foo/bar?query#fragment") - .header(ACCEPT, "application/json") - .body(Some(b"testbody".to_vec())) - .unwrap(), - &mut buffer, - )?; - assert_eq!( - str::from_utf8(&buffer).unwrap(), - "POST /foo/bar?query HTTP/1.1\r\nhost: example.com\r\nconnection: close\r\naccept: application/json\r\ncontent-length: 8\r\n\r\ntestbody" + request.headers_mut().append( + HeaderName::USER_AGENT, + concat!("Oxigraph/", env!("CARGO_PKG_VERSION")) + .parse() + .map_err(invalid_input_error)?, ); - Ok(()) - } - - #[test] - fn decode_response_without_payload() -> io::Result<()> { - let response = Client::decode(Cursor::new("HTTP/1.1 404 Not Found\r\n\r\n")).unwrap(); - assert_eq!(response.status(), StatusCode::NOT_FOUND); - let mut buf = String::new(); - response.into_body().read_to_string(&mut buf)?; - assert!(buf.is_empty()); - Ok(()) + let response = self.client.request(request)?; + let content_type = response + .headers() + .get(&HeaderName::CONTENT_TYPE) + .ok_or_else(|| invalid_data_error(format!("No Content-Type returned by {}", url)))? + .to_str() + .map_err(invalid_data_error)? + .to_owned(); + Ok((content_type, response.into_body())) } - #[test] - fn decode_response_with_fixed_payload() -> io::Result<()> { - let response = Client::decode(Cursor::new( - "HTTP/1.1 200 OK\r\ncontent-type: text/plain\r\ncontent-length:8\r\n\r\ntestbody", - ))?; - assert_eq!(response.status(), StatusCode::OK); - assert_eq!( - response - .headers() - .get(CONTENT_TYPE) - .unwrap() - .to_str() - .unwrap(), - "text/plain" + pub fn post( + &self, + url: &str, + payload: Vec, + content_type: &str, + accept: &str, + ) -> Result<(String, Body)> { + let mut request = Request::new(Method::GET, url.parse().map_err(invalid_input_error)?); + request.headers_mut().append( + HeaderName::ACCEPT, + accept.parse().map_err(invalid_input_error)?, ); - let mut buf = String::new(); - response.into_body().read_to_string(&mut buf)?; - assert_eq!(buf, "testbody"); - Ok(()) - } - - #[test] - fn decode_response_with_chunked_payload() -> io::Result<()> { - let response = Client::decode(Cursor::new( - "HTTP/1.1 200 OK\r\ncontent-type: text/plain\r\ntransfer-encoding:chunked\r\n\r\n4\r\nWiki\r\n5\r\npedia\r\nE\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n", - ))?; - assert_eq!(response.status(), StatusCode::OK); - assert_eq!( - response - .headers() - .get(CONTENT_TYPE) - .unwrap() - .to_str() - .unwrap(), - "text/plain" + request.headers_mut().append( + HeaderName::USER_AGENT, + concat!("Oxigraph/", env!("CARGO_PKG_VERSION")) + .parse() + .map_err(invalid_input_error)?, ); - let mut buf = String::new(); - response.into_body().read_to_string(&mut buf)?; - assert_eq!(buf, "Wikipedia in\r\n\r\nchunks."); - Ok(()) - } - - #[test] - fn decode_response_with_trailer() -> io::Result<()> { - let response = Client::decode(Cursor::new( - "HTTP/1.1 200 OK\r\ncontent-type: text/plain\r\ntransfer-encoding:chunked\r\n\r\n4\r\nWiki\r\n5\r\npedia\r\nE\r\n in\r\n\r\nchunks.\r\n0\r\ntest: foo\r\n\r\n", - ))?; - assert_eq!(response.status(), StatusCode::OK); - assert_eq!( - response - .headers() - .get(CONTENT_TYPE) - .unwrap() - .to_str() - .unwrap(), - "text/plain" + request.headers_mut().append( + HeaderName::CONTENT_TYPE, + content_type.parse().map_err(invalid_input_error)?, ); - let mut buf = String::new(); - response.into_body().read_to_string(&mut buf)?; - assert_eq!(buf, "Wikipedia in\r\n\r\nchunks."); - Ok(()) + let response = self.client.request(request.with_body(payload))?; + let content_type = response + .headers() + .get(&HeaderName::CONTENT_TYPE) + .ok_or_else(|| invalid_data_error(format!("No Content-Type returned by {}", url)))? + .to_str() + .map_err(invalid_data_error)? + .to_owned(); + Ok((content_type, response.into_body())) } } diff --git a/lib/src/sparql/service.rs b/lib/src/sparql/service.rs index cf3a391d..539d5af8 100644 --- a/lib/src/sparql/service.rs +++ b/lib/src/sparql/service.rs @@ -1,13 +1,11 @@ -use crate::error::{invalid_data_error, invalid_input_error}; use crate::model::NamedNode; use crate::sparql::algebra::Query; use crate::sparql::error::EvaluationError; use crate::sparql::http::Client; use crate::sparql::model::QueryResults; use crate::sparql::QueryResultsFormat; -use http::header::{ACCEPT, CONTENT_TYPE, USER_AGENT}; -use http::{Method, Request, StatusCode}; use std::error::Error; +use std::io::BufReader; /// Handler for [SPARQL 1.1 Federated Query](https://www.w3.org/TR/sparql11-federated-query/) SERVICE. /// @@ -113,42 +111,18 @@ impl ServiceHandler for SimpleServiceHandler { service_name: NamedNode, query: Query, ) -> Result { - let request = Request::builder() - .method(Method::POST) - .uri(service_name.as_str()) - .header(CONTENT_TYPE, "application/sparql-query") - .header( - ACCEPT, - "application/sparql-results+json, application/sparql-results+xml", - ) - .header(USER_AGENT, concat!("Oxigraph/", env!("CARGO_PKG_VERSION"))) - .body(Some(query.to_string().into_bytes())) - .map_err(invalid_input_error)?; - let response = self.client.request(&request)?; - if response.status() != StatusCode::OK { - return Err(EvaluationError::msg(format!( - "HTTP error code {} returned when querying service {}", - response.status(), - service_name - ))); - } - let content_type = response - .headers() - .get(CONTENT_TYPE) - .ok_or_else(|| { - EvaluationError::msg(format!( - "No Content-Type header returned by {}", - service_name - )) - })? - .to_str() - .map_err(invalid_data_error)?; - let format = QueryResultsFormat::from_media_type(content_type).ok_or_else(|| { + let (content_type, body) = self.client.post( + service_name.as_str(), + query.to_string().into_bytes(), + "application/sparql-query", + "application/sparql-results+json, application/sparql-results+xml", + )?; + let format = QueryResultsFormat::from_media_type(&content_type).ok_or_else(|| { EvaluationError::msg(format!( "Unsupported Content-Type returned by {}: {}", service_name, content_type )) })?; - Ok(QueryResults::read(response.into_body(), format)?) + Ok(QueryResults::read(BufReader::new(body), format)?) } } diff --git a/lib/src/sparql/update.rs b/lib/src/sparql/update.rs index c981cbe6..9603ff79 100644 --- a/lib/src/sparql/update.rs +++ b/lib/src/sparql/update.rs @@ -1,4 +1,3 @@ -use crate::error::{invalid_data_error, invalid_input_error}; use crate::io::GraphFormat; use crate::model::{ BlankNode as OxBlankNode, GraphName as OxGraphName, GraphNameRef, Literal as OxLiteral, @@ -14,8 +13,6 @@ use crate::sparql::{EvaluationError, UpdateOptions}; use crate::storage::io::load_graph; use crate::storage::numeric_encoder::{Decoder, EncodedTerm}; use crate::storage::Storage; -use http::header::{ACCEPT, CONTENT_TYPE, USER_AGENT}; -use http::{Method, Request, StatusCode}; use oxiri::Iri; use spargebra::algebra::{GraphPattern, GraphTarget}; use spargebra::term::{ @@ -27,6 +24,7 @@ use spargebra::term::{ use spargebra::GraphUpdateOperation; use std::collections::HashMap; use std::io; +use std::io::BufReader; use std::rc::Rc; pub struct SimpleUpdateEvaluator<'a> { @@ -150,33 +148,11 @@ impl<'a> SimpleUpdateEvaluator<'a> { } fn eval_load(&mut self, from: &NamedNode, to: &GraphName) -> Result<(), EvaluationError> { - let request = Request::builder() - .method(Method::GET) - .uri(&from.iri) - .header( - ACCEPT, - "application/n-triples, text/turtle, application/rdf+xml", - ) - .header(USER_AGENT, concat!("Oxigraph/", env!("CARGO_PKG_VERSION"))) - .body(None) - .map_err(invalid_input_error)?; - let response = self.client.request(&request)?; - if response.status() != StatusCode::OK { - return Err(EvaluationError::msg(format!( - "HTTP error code {} returned when fetching {}", - response.status(), - from - ))); - } - let content_type = response - .headers() - .get(CONTENT_TYPE) - .ok_or_else(|| { - EvaluationError::msg(format!("No Content-Type header returned by {}", from)) - })? - .to_str() - .map_err(invalid_data_error)?; - let format = GraphFormat::from_media_type(content_type).ok_or_else(|| { + let (content_type, body) = self.client.get( + &from.iri, + "application/n-triples, text/turtle, application/rdf+xml", + )?; + let format = GraphFormat::from_media_type(&content_type).ok_or_else(|| { EvaluationError::msg(format!( "Unsupported Content-Type returned by {}: {}", from, content_type @@ -188,7 +164,7 @@ impl<'a> SimpleUpdateEvaluator<'a> { }; load_graph( self.storage, - response.into_body(), + BufReader::new(body), format, to_graph_name, Some(&from.iri),