Use OxHTTP HTTP client

pull/171/head
Tpt 3 years ago
parent b82168bc6a
commit d722edd4af
  1. 6
      lib/Cargo.toml
  2. 20
      lib/src/sparql/http/dummy.rs
  3. 454
      lib/src/sparql/http/simple.rs
  4. 42
      lib/src/sparql/service.rs
  5. 36
      lib/src/sparql/update.rs

@ -19,7 +19,7 @@ all-features = true
[features] [features]
default = [] default = []
sophia = ["sophia_api"] sophia = ["sophia_api"]
http_client = ["httparse", "native-tls"] http_client = ["oxhttp", "oxhttp/native-tls"]
[dependencies] [dependencies]
quick-xml = "0.22" quick-xml = "0.22"
@ -39,14 +39,12 @@ nom = "7"
siphasher = "0.3" siphasher = "0.3"
lasso = "0.5" lasso = "0.5"
sophia_api = { version = "0.7", optional = true } sophia_api = { version = "0.7", optional = true }
http = "0.2"
httparse = { version = "1", optional = true }
native-tls = { version = "0.2", optional = true }
json-event-parser = "0.1" json-event-parser = "0.1"
spargebra = { version = "0.1", path="../spargebra", features = ["rdf-star"] } spargebra = { version = "0.1", path="../spargebra", features = ["rdf-star"] }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies] [target.'cfg(not(target_arch = "wasm32"))'.dependencies]
sled = "0.34" sled = "0.34"
oxhttp = { git = "https://github.com/oxigraph/oxhttp", branch = "master", optional = true }
[target.'cfg(target_arch = "wasm32")'.dependencies] [target.'cfg(target_arch = "wasm32")'.dependencies]
js-sys = "0.3" js-sys = "0.3"

@ -1,9 +1,8 @@
//! Simple HTTP client //! Simple HTTP client
use crate::error::invalid_input_error; use crate::error::invalid_input_error;
use http::{Request, Response};
use std::io; use std::io;
use std::io::BufRead; use std::io::{BufRead, Empty, Read, Result};
pub struct Client {} pub struct Client {}
@ -12,12 +11,21 @@ impl Client {
Self {} Self {}
} }
pub fn request( pub fn get(&self, _url: &str, _accept: &str) -> Result<(String, Empty)> {
Err(invalid_input_error(
"HTTP client is not available. Enable the feature 'http_client'",
))
}
pub fn post(
&self, &self,
_request: &Request<Option<Vec<u8>>>, _url: &str,
) -> io::Result<Response<Box<dyn BufRead>>> { _payload: Vec<u8>,
_content_type: &str,
_accept: &str,
) -> Result<(String, Empty)> {
Err(invalid_input_error( Err(invalid_input_error(
"HTTP client is not available. Enable the feature 'simple_http'", "HTTP client is not available. Enable the feature 'http_client'",
)) ))
} }
} }

@ -1,420 +1,70 @@
//! Simple HTTP client
use crate::error::{invalid_data_error, invalid_input_error}; use crate::error::{invalid_data_error, invalid_input_error};
use http::header::{CONNECTION, CONTENT_LENGTH, HOST, TRANSFER_ENCODING}; use oxhttp::model::{Body, HeaderName, Method, Request};
use http::{Request, Response, Version}; use std::io::{Read, Result};
use httparse::Status;
use native_tls::TlsConnector;
use std::cmp::min;
use std::convert::TryInto;
use std::io;
use std::io::{BufRead, BufReader, Read, Write};
use std::net::TcpStream;
pub struct Client {} #[derive(Default)]
pub struct Client {
client: oxhttp::Client,
}
impl Client { impl Client {
pub fn new() -> Self { pub fn new() -> Self {
Self {} Self::default()
}
#[allow(clippy::unused_self)]
pub fn request(
&self,
request: &Request<Option<Vec<u8>>>,
) -> io::Result<Response<Box<dyn BufRead>>> {
let scheme = request
.uri()
.scheme_str()
.ok_or_else(|| invalid_input_error("No host provided"))?;
let port = if let Some(port) = request.uri().port_u16() {
port
} else {
match scheme {
"http" => 80,
"https" => 443,
_ => {
return Err(invalid_input_error(format!(
"No port provided for scheme '{}'",
scheme
)))
}
}
};
let host = request
.uri()
.host()
.ok_or_else(|| invalid_input_error("No host provided"))?;
match scheme {
"http" => {
let mut stream = TcpStream::connect((host, port))?;
Self::encode(request, &mut stream)?;
Self::decode(stream)
}
"https" => {
let connector =
TlsConnector::new().map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let stream = TcpStream::connect((host, port))?;
let mut stream = connector
.connect(host, stream)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
Self::encode(request, &mut stream)?;
Self::decode(stream)
}
_ => Err(invalid_input_error(format!(
"Not supported URL scheme: {}",
scheme
))),
}
}
fn encode(request: &Request<Option<Vec<u8>>>, mut writer: &mut impl Write) -> io::Result<()> {
if request.headers().contains_key(CONTENT_LENGTH) {
return Err(invalid_input_error(
"content-length header is set by the client library",
));
}
if request.headers().contains_key(HOST) {
return Err(invalid_input_error(
"host header is set by the client library",
));
}
if request.headers().contains_key(CONNECTION) {
return Err(invalid_input_error(
"connection header is set by the client library",
));
}
if let Some(query) = request.uri().query() {
write!(
&mut writer,
"{} {}?{} HTTP/1.1\r\n",
request.method(),
request.uri().path(),
query
)?;
} else {
write!(
&mut writer,
"{} {} HTTP/1.1\r\n",
request.method(),
request.uri().path()
)?;
}
// host
let host = request
.uri()
.host()
.ok_or_else(|| invalid_input_error("No host provided"))?;
if let Some(port) = request.uri().port() {
write!(writer, "host: {}:{}\r\n", request.uri(), port)
} else {
write!(writer, "host: {}\r\n", host)
}?;
// connection
write!(writer, "connection: close\r\n")?;
// headers
for (name, value) in request.headers() {
write!(writer, "{}: ", name.as_str())?;
writer.write_all(value.as_bytes())?;
write!(writer, "\r\n")?;
}
// body with content-length
if let Some(payload) = request.body() {
write!(writer, "content-length: {}\r\n\r\n", payload.len())?;
writer.write_all(payload)?;
} else {
write!(writer, "\r\n")?;
}
Ok(())
}
fn decode<'a>(reader: impl Read + 'a) -> io::Result<Response<Box<dyn BufRead + 'a>>> {
let mut reader = BufReader::new(reader);
// Let's read the headers
let mut buffer = Vec::new();
let mut headers = [httparse::EMPTY_HEADER; 1024];
let mut parsed_response = httparse::Response::new(&mut headers);
loop {
if reader.read_until(b'\n', &mut buffer)? == 0 {
return Err(invalid_data_error("Empty HTTP response"));
}
if buffer.len() > 8 * 1024 {
return Err(invalid_data_error("The headers size should fit in 8kb"));
}
if buffer.ends_with(b"\r\n\r\n") || buffer.ends_with(b"\n\n") {
break; //end of buffer
}
}
if parsed_response
.parse(&buffer)
.map_err(invalid_data_error)?
.is_partial()
{
return Err(invalid_input_error(
"Partial HTTP headers containing two line jumps",
));
} }
// Let's build the response pub fn get(&self, url: &str, accept: &str) -> Result<(String, Body)> {
let mut response = Response::builder() let mut request = Request::new(Method::GET, url.parse().map_err(invalid_input_error)?);
.status( request.headers_mut().append(
parsed_response HeaderName::ACCEPT,
.code accept.parse().map_err(invalid_input_error)?,
.ok_or_else(|| invalid_data_error("No status code in the HTTP response"))?,
)
.version(match parsed_response.version {
Some(0) => Version::HTTP_10,
Some(1) => Version::HTTP_11,
Some(id) => {
return Err(invalid_data_error(format!(
"Unsupported HTTP response version: 1.{}",
id
)))
}
None => return Err(invalid_data_error("No HTTP version in the HTTP response")),
});
for header in parsed_response.headers {
response = response.header(header.name, header.value);
}
let content_length = response.headers_ref().and_then(|h| h.get(CONTENT_LENGTH));
let transfer_encoding = response
.headers_ref()
.and_then(|h| h.get(TRANSFER_ENCODING));
if transfer_encoding.is_some() && content_length.is_some() {
return Err(invalid_data_error(
"Transfer-Encoding and Content-Length should not be set at the same time",
));
}
let body: Box<dyn BufRead> = if let Some(content_length) = content_length {
let len = content_length
.to_str()
.map_err(invalid_data_error)?
.parse::<u64>()
.map_err(invalid_data_error)?;
Box::new(reader.take(len))
} else if let Some(transfer_encoding) = transfer_encoding {
let transfer_encoding = transfer_encoding.to_str().map_err(invalid_data_error)?;
if transfer_encoding.eq_ignore_ascii_case("chunked") {
buffer.clear();
Box::new(BufReader::new(ChunkedResponse {
reader,
buffer,
is_start: true,
chunk_position: 1,
chunk_size: 1,
}))
} else {
return Err(invalid_data_error(format!(
"Transfer-Encoding: {} is not supported",
transfer_encoding
)));
}
} else {
Box::new(io::empty())
};
response.body(body).map_err(invalid_data_error)
}
}
struct ChunkedResponse<R: BufRead> {
reader: R,
buffer: Vec<u8>,
is_start: bool,
chunk_position: usize,
chunk_size: usize,
}
impl<R: BufRead> Read for ChunkedResponse<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
loop {
// In case we still have data
if self.chunk_position < self.chunk_size {
let inner_buf = self.reader.fill_buf()?;
let size = min(
min(buf.len(), inner_buf.len()),
self.chunk_size - self.chunk_position,
); );
buf[..size].copy_from_slice(&inner_buf[..size]); request.headers_mut().append(
self.reader.consume(size); HeaderName::USER_AGENT,
self.chunk_position += size; concat!("Oxigraph/", env!("CARGO_PKG_VERSION"))
return Ok(size); // Won't be 0 if there is still some inner buffer .parse()
} .map_err(invalid_input_error)?,
if self.chunk_size == 0 {
return Ok(0); // We know it's the end
}
if self.is_start {
self.is_start = false;
} else {
// chunk end
self.buffer.clear();
self.reader.read_until(b'\n', &mut self.buffer)?;
if self.buffer != b"\r\n" && self.buffer != b"\n" {
return Err(invalid_data_error("Invalid chunked element end"));
}
}
// We load a new chunk
self.buffer.clear();
self.reader.read_until(b'\n', &mut self.buffer)?;
self.chunk_position = 0;
self.chunk_size = if let Ok(Status::Complete((read, chunk_size))) =
httparse::parse_chunk_size(&self.buffer)
{
if read != self.buffer.len() {
return Err(invalid_data_error("Chuncked header containing a line jump"));
}
chunk_size.try_into().map_err(invalid_data_error)?
} else {
return Err(invalid_data_error("Invalid chuncked header"));
};
if self.chunk_size == 0 {
// we read the trailers
loop {
if self.reader.read_until(b'\n', &mut self.buffer)? == 0 {
return Err(invalid_data_error("Missing chunked encoding end"));
}
if self.buffer.len() > 8 * 1024 {
return Err(invalid_data_error("The trailers size should fit in 8kb"));
}
if self.buffer.ends_with(b"\r\n\r\n") || self.buffer.ends_with(b"\n\n") {
break; //end of buffer
}
}
return Ok(0);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use http::header::{ACCEPT, CONTENT_TYPE};
use http::{Method, StatusCode};
use std::io::Cursor;
use std::str;
#[test]
fn encode_get_request() -> io::Result<()> {
let mut buffer = Vec::new();
Client::encode(
&Request::builder()
.method(Method::GET)
.uri("http://example.com/foo/bar?query#fragment")
.header(ACCEPT, "application/json")
.body(None)
.unwrap(),
&mut buffer,
)?;
assert_eq!(
str::from_utf8(&buffer).unwrap(),
"GET /foo/bar?query HTTP/1.1\r\nhost: example.com\r\nconnection: close\r\naccept: application/json\r\n\r\n"
);
Ok(())
}
#[test]
fn encode_post_request() -> io::Result<()> {
let mut buffer = Vec::new();
Client::encode(
&Request::builder()
.method(Method::POST)
.uri("http://example.com/foo/bar?query#fragment")
.header(ACCEPT, "application/json")
.body(Some(b"testbody".to_vec()))
.unwrap(),
&mut buffer,
)?;
assert_eq!(
str::from_utf8(&buffer).unwrap(),
"POST /foo/bar?query HTTP/1.1\r\nhost: example.com\r\nconnection: close\r\naccept: application/json\r\ncontent-length: 8\r\n\r\ntestbody"
); );
Ok(()) let response = self.client.request(request)?;
} let content_type = response
#[test]
fn decode_response_without_payload() -> io::Result<()> {
let response = Client::decode(Cursor::new("HTTP/1.1 404 Not Found\r\n\r\n")).unwrap();
assert_eq!(response.status(), StatusCode::NOT_FOUND);
let mut buf = String::new();
response.into_body().read_to_string(&mut buf)?;
assert!(buf.is_empty());
Ok(())
}
#[test]
fn decode_response_with_fixed_payload() -> io::Result<()> {
let response = Client::decode(Cursor::new(
"HTTP/1.1 200 OK\r\ncontent-type: text/plain\r\ncontent-length:8\r\n\r\ntestbody",
))?;
assert_eq!(response.status(), StatusCode::OK);
assert_eq!(
response
.headers() .headers()
.get(CONTENT_TYPE) .get(&HeaderName::CONTENT_TYPE)
.unwrap() .ok_or_else(|| invalid_data_error(format!("No Content-Type returned by {}", url)))?
.to_str() .to_str()
.unwrap(), .map_err(invalid_data_error)?
"text/plain" .to_owned();
); Ok((content_type, response.into_body()))
let mut buf = String::new();
response.into_body().read_to_string(&mut buf)?;
assert_eq!(buf, "testbody");
Ok(())
} }
#[test] pub fn post(
fn decode_response_with_chunked_payload() -> io::Result<()> { &self,
let response = Client::decode(Cursor::new( url: &str,
"HTTP/1.1 200 OK\r\ncontent-type: text/plain\r\ntransfer-encoding:chunked\r\n\r\n4\r\nWiki\r\n5\r\npedia\r\nE\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n", payload: Vec<u8>,
))?; content_type: &str,
assert_eq!(response.status(), StatusCode::OK); accept: &str,
assert_eq!( ) -> Result<(String, Body)> {
response let mut request = Request::new(Method::GET, url.parse().map_err(invalid_input_error)?);
.headers() request.headers_mut().append(
.get(CONTENT_TYPE) HeaderName::ACCEPT,
.unwrap() accept.parse().map_err(invalid_input_error)?,
.to_str()
.unwrap(),
"text/plain"
); );
let mut buf = String::new(); request.headers_mut().append(
response.into_body().read_to_string(&mut buf)?; HeaderName::USER_AGENT,
assert_eq!(buf, "Wikipedia in\r\n\r\nchunks."); concat!("Oxigraph/", env!("CARGO_PKG_VERSION"))
Ok(()) .parse()
} .map_err(invalid_input_error)?,
);
#[test] request.headers_mut().append(
fn decode_response_with_trailer() -> io::Result<()> { HeaderName::CONTENT_TYPE,
let response = Client::decode(Cursor::new( content_type.parse().map_err(invalid_input_error)?,
"HTTP/1.1 200 OK\r\ncontent-type: text/plain\r\ntransfer-encoding:chunked\r\n\r\n4\r\nWiki\r\n5\r\npedia\r\nE\r\n in\r\n\r\nchunks.\r\n0\r\ntest: foo\r\n\r\n", );
))?; let response = self.client.request(request.with_body(payload))?;
assert_eq!(response.status(), StatusCode::OK); let content_type = response
assert_eq!(
response
.headers() .headers()
.get(CONTENT_TYPE) .get(&HeaderName::CONTENT_TYPE)
.unwrap() .ok_or_else(|| invalid_data_error(format!("No Content-Type returned by {}", url)))?
.to_str() .to_str()
.unwrap(), .map_err(invalid_data_error)?
"text/plain" .to_owned();
); Ok((content_type, response.into_body()))
let mut buf = String::new();
response.into_body().read_to_string(&mut buf)?;
assert_eq!(buf, "Wikipedia in\r\n\r\nchunks.");
Ok(())
} }
} }

@ -1,13 +1,11 @@
use crate::error::{invalid_data_error, invalid_input_error};
use crate::model::NamedNode; use crate::model::NamedNode;
use crate::sparql::algebra::Query; use crate::sparql::algebra::Query;
use crate::sparql::error::EvaluationError; use crate::sparql::error::EvaluationError;
use crate::sparql::http::Client; use crate::sparql::http::Client;
use crate::sparql::model::QueryResults; use crate::sparql::model::QueryResults;
use crate::sparql::QueryResultsFormat; use crate::sparql::QueryResultsFormat;
use http::header::{ACCEPT, CONTENT_TYPE, USER_AGENT};
use http::{Method, Request, StatusCode};
use std::error::Error; use std::error::Error;
use std::io::BufReader;
/// Handler for [SPARQL 1.1 Federated Query](https://www.w3.org/TR/sparql11-federated-query/) SERVICE. /// Handler for [SPARQL 1.1 Federated Query](https://www.w3.org/TR/sparql11-federated-query/) SERVICE.
/// ///
@ -113,42 +111,18 @@ impl ServiceHandler for SimpleServiceHandler {
service_name: NamedNode, service_name: NamedNode,
query: Query, query: Query,
) -> Result<QueryResults, EvaluationError> { ) -> Result<QueryResults, EvaluationError> {
let request = Request::builder() let (content_type, body) = self.client.post(
.method(Method::POST) service_name.as_str(),
.uri(service_name.as_str()) query.to_string().into_bytes(),
.header(CONTENT_TYPE, "application/sparql-query") "application/sparql-query",
.header(
ACCEPT,
"application/sparql-results+json, application/sparql-results+xml", "application/sparql-results+json, application/sparql-results+xml",
) )?;
.header(USER_AGENT, concat!("Oxigraph/", env!("CARGO_PKG_VERSION"))) let format = QueryResultsFormat::from_media_type(&content_type).ok_or_else(|| {
.body(Some(query.to_string().into_bytes()))
.map_err(invalid_input_error)?;
let response = self.client.request(&request)?;
if response.status() != StatusCode::OK {
return Err(EvaluationError::msg(format!(
"HTTP error code {} returned when querying service {}",
response.status(),
service_name
)));
}
let content_type = response
.headers()
.get(CONTENT_TYPE)
.ok_or_else(|| {
EvaluationError::msg(format!(
"No Content-Type header returned by {}",
service_name
))
})?
.to_str()
.map_err(invalid_data_error)?;
let format = QueryResultsFormat::from_media_type(content_type).ok_or_else(|| {
EvaluationError::msg(format!( EvaluationError::msg(format!(
"Unsupported Content-Type returned by {}: {}", "Unsupported Content-Type returned by {}: {}",
service_name, content_type service_name, content_type
)) ))
})?; })?;
Ok(QueryResults::read(response.into_body(), format)?) Ok(QueryResults::read(BufReader::new(body), format)?)
} }
} }

@ -1,4 +1,3 @@
use crate::error::{invalid_data_error, invalid_input_error};
use crate::io::GraphFormat; use crate::io::GraphFormat;
use crate::model::{ use crate::model::{
BlankNode as OxBlankNode, GraphName as OxGraphName, GraphNameRef, Literal as OxLiteral, BlankNode as OxBlankNode, GraphName as OxGraphName, GraphNameRef, Literal as OxLiteral,
@ -14,8 +13,6 @@ use crate::sparql::{EvaluationError, UpdateOptions};
use crate::storage::io::load_graph; use crate::storage::io::load_graph;
use crate::storage::numeric_encoder::{Decoder, EncodedTerm}; use crate::storage::numeric_encoder::{Decoder, EncodedTerm};
use crate::storage::Storage; use crate::storage::Storage;
use http::header::{ACCEPT, CONTENT_TYPE, USER_AGENT};
use http::{Method, Request, StatusCode};
use oxiri::Iri; use oxiri::Iri;
use spargebra::algebra::{GraphPattern, GraphTarget}; use spargebra::algebra::{GraphPattern, GraphTarget};
use spargebra::term::{ use spargebra::term::{
@ -27,6 +24,7 @@ use spargebra::term::{
use spargebra::GraphUpdateOperation; use spargebra::GraphUpdateOperation;
use std::collections::HashMap; use std::collections::HashMap;
use std::io; use std::io;
use std::io::BufReader;
use std::rc::Rc; use std::rc::Rc;
pub struct SimpleUpdateEvaluator<'a> { pub struct SimpleUpdateEvaluator<'a> {
@ -150,33 +148,11 @@ impl<'a> SimpleUpdateEvaluator<'a> {
} }
fn eval_load(&mut self, from: &NamedNode, to: &GraphName) -> Result<(), EvaluationError> { fn eval_load(&mut self, from: &NamedNode, to: &GraphName) -> Result<(), EvaluationError> {
let request = Request::builder() let (content_type, body) = self.client.get(
.method(Method::GET) &from.iri,
.uri(&from.iri)
.header(
ACCEPT,
"application/n-triples, text/turtle, application/rdf+xml", "application/n-triples, text/turtle, application/rdf+xml",
) )?;
.header(USER_AGENT, concat!("Oxigraph/", env!("CARGO_PKG_VERSION"))) let format = GraphFormat::from_media_type(&content_type).ok_or_else(|| {
.body(None)
.map_err(invalid_input_error)?;
let response = self.client.request(&request)?;
if response.status() != StatusCode::OK {
return Err(EvaluationError::msg(format!(
"HTTP error code {} returned when fetching {}",
response.status(),
from
)));
}
let content_type = response
.headers()
.get(CONTENT_TYPE)
.ok_or_else(|| {
EvaluationError::msg(format!("No Content-Type header returned by {}", from))
})?
.to_str()
.map_err(invalid_data_error)?;
let format = GraphFormat::from_media_type(content_type).ok_or_else(|| {
EvaluationError::msg(format!( EvaluationError::msg(format!(
"Unsupported Content-Type returned by {}: {}", "Unsupported Content-Type returned by {}: {}",
from, content_type from, content_type
@ -188,7 +164,7 @@ impl<'a> SimpleUpdateEvaluator<'a> {
}; };
load_graph( load_graph(
self.storage, self.storage,
response.into_body(), BufReader::new(body),
format, format,
to_graph_name, to_graph_name,
Some(&from.iri), Some(&from.iri),

Loading…
Cancel
Save