From 0f8b4eddd2af58a1f1fbc380161b990f48cea17d Mon Sep 17 00:00:00 2001 From: Tpt Date: Sat, 23 May 2020 06:53:25 +0200 Subject: [PATCH] Upgrades to async-h1 2 --- server/Cargo.toml | 4 +-- server/src/main.rs | 69 +++++++++++++++--------------------------- wikibase/Cargo.toml | 6 ++-- wikibase/src/loader.rs | 46 +++++++++++++++++++++++----- wikibase/src/main.rs | 37 +++++++++------------- 5 files changed, 82 insertions(+), 80 deletions(-) diff --git a/server/Cargo.toml b/server/Cargo.toml index 635664f3..a5d47cf8 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -13,7 +13,7 @@ edition = "2018" [dependencies] argh = "0.1" async-std = { version = "1", features = ["attributes"] } -async-h1 = "1" -http-types = "1" +async-h1 = "2" +http-types = "2" oxigraph = { path = "../lib", features = ["rocksdb"] } url = "2" diff --git a/server/src/main.rs b/server/src/main.rs index 0999d788..3722978f 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -15,7 +15,6 @@ use async_std::io::{BufRead, Read}; use async_std::net::{TcpListener, TcpStream}; use async_std::prelude::*; use async_std::task::{block_on, spawn, spawn_blocking}; -use http_types::headers::HeaderName; use http_types::{headers, Body, Error, Method, Mime, Request, Response, Result, StatusCode}; use oxigraph::sparql::{PreparedQuery, QueryOptions, QueryResult, QueryResultSyntax}; use oxigraph::{DatasetSyntax, FileSyntax, GraphSyntax, RocksDbStore}; @@ -44,7 +43,7 @@ pub async fn main() -> Result<()> { let store = RocksDbStore::open(args.file)?; println!("Listening for requests at http://{}", &args.bind); - http_server(args.bind, move |request| { + http_server(&args.bind, move |request| { handle_request(request, store.clone()) }) .await @@ -54,17 +53,17 @@ async fn handle_request(request: Request, store: RocksDbStore) -> Result { let mut response = Response::new(StatusCode::Ok); - response.append_header(headers::CONTENT_TYPE, "text/html")?; + response.append_header(headers::CONTENT_TYPE, "text/html"); response.set_body(HTML_ROOT_PAGE); response } ("/", Method::Post) => { if let Some(content_type) = request.content_type() { - match if let Some(format) = GraphSyntax::from_mime_type(essence(&content_type)) { + match if let Some(format) = GraphSyntax::from_mime_type(content_type.essence()) { spawn_blocking(move || { store.load_graph(SyncAsyncBufReader::from(request), format, None, None) }) - } else if let Some(format) = DatasetSyntax::from_mime_type(essence(&content_type)) { + } else if let Some(format) = DatasetSyntax::from_mime_type(content_type.essence()) { spawn_blocking(move || { store.load_dataset(SyncAsyncBufReader::from(request), format, None) }) @@ -97,7 +96,7 @@ async fn handle_request(request: Request, store: RocksDbStore) -> Result { if let Some(content_type) = request.content_type() { - if essence(&content_type) == "application/sparql-query" { + if content_type.essence() == "application/sparql-query" { let mut buffer = String::new(); let mut request = request; request @@ -106,7 +105,7 @@ async fn handle_request(request: Request, store: RocksDbStore) -> Result Result Response::new(StatusCode::NotFound), }; - response.append_header("Server", SERVER)?; + response.append_header(headers::SERVER, SERVER); Ok(response) } -/// TODO: bad hack to overcome http_types limitations -fn essence(mime: &Mime) -> &str { - mime.essence().split(';').next().unwrap_or("") -} - fn simple_response(status: StatusCode, body: impl Into) -> Response { let mut response = Response::new(status); response.set_body(body); @@ -183,7 +177,7 @@ async fn evaluate_sparql_query( )?; let mut response = Response::from(results.write_graph(Vec::default(), format)?); - response.insert_header(headers::CONTENT_TYPE, format.media_type())?; + response.insert_header(headers::CONTENT_TYPE, format.media_type()); Ok(response) } else { let format = content_negotiation( @@ -194,7 +188,7 @@ async fn evaluate_sparql_query( ], )?; let mut response = Response::from(results.write(Vec::default(), format)?); - response.insert_header(headers::CONTENT_TYPE, format.media_type())?; + response.insert_header(headers::CONTENT_TYPE, format.media_type()); Ok(response) } }) @@ -205,15 +199,14 @@ async fn http_server< F: Clone + Send + Sync + 'static + Fn(Request) -> Fut, Fut: Send + Future>, >( - host: String, + host: &str, handle: F, ) -> Result<()> { async fn accept Fut, Fut: Future>>( - addr: String, stream: TcpStream, handle: F, ) -> Result<()> { - async_h1::accept(&addr, stream, |request| async { + async_h1::accept(stream, |request| async { Ok(match handle(request).await { Ok(result) => result, Err(error) => simple_response(error.status(), error.to_string()), @@ -222,14 +215,13 @@ async fn http_server< .await } - let listener = TcpListener::bind(&host).await?; + let listener = TcpListener::bind(host).await?; let mut incoming = listener.incoming(); while let Some(stream) = incoming.next().await { - let stream = stream?.clone(); //TODO: clone stream? + let stream = stream?; let handle = handle.clone(); - let addr = format!("http://{}", host); spawn(async { - if let Err(err) = accept(addr, stream, handle).await { + if let Err(err) = accept(stream, handle).await { eprintln!("{}", err); }; }); @@ -239,9 +231,8 @@ async fn http_server< fn content_negotiation(request: Request, supported: &[&str]) -> Result { let header = request - .header(&HeaderName::from_str("Accept").unwrap()) - .and_then(|h| h.last()) - .map(|h| h.as_str().trim()) + .header(headers::ACCEPT) + .map(|h| h.last().as_str().trim()) .unwrap_or(""); let supported: Vec = supported .iter() @@ -255,7 +246,7 @@ fn content_negotiation(request: Request, supported: &[&str]) -> R for possible in header.split(',') { let possible = Mime::from_str(possible.trim())?; let score = if let Some(q) = possible.param("q") { - f32::from_str(q)? + f32::from_str(&q.to_string())? } else { 1. }; @@ -274,7 +265,7 @@ fn content_negotiation(request: Request, supported: &[&str]) -> R } } - F::from_mime_type(essence(result)) + F::from_mime_type(result.essence()) .ok_or_else(|| Error::from_str(StatusCode::InternalServerError, "Unknown mime type")) } @@ -301,7 +292,7 @@ impl std::io::BufRead for SyncAsyncBufReader { unimplemented!() } - fn consume(&mut self, amt: usize) { + fn consume(&mut self, _: usize) { unimplemented!() } @@ -336,9 +327,7 @@ mod tests { #[test] fn post_file() { let mut request = Request::new(Method::Post, Url::parse("http://localhost/").unwrap()); - request - .insert_header("Content-Type", "text/turtle") - .unwrap(); + request.insert_header("Content-Type", "text/turtle"); request.set_body(" ."); exec(request, StatusCode::NoContent) } @@ -346,9 +335,7 @@ mod tests { #[test] fn post_wrong_file() { let mut request = Request::new(Method::Post, Url::parse("http://localhost/").unwrap()); - request - .insert_header("Content-Type", "text/turtle") - .unwrap(); + request.insert_header("Content-Type", "text/turtle"); request.set_body(""); exec(request, StatusCode::BadRequest) } @@ -356,7 +343,7 @@ mod tests { #[test] fn post_unsupported_file() { let mut request = Request::new(Method::Post, Url::parse("http://localhost/").unwrap()); - request.insert_header("Content-Type", "text/plain").unwrap(); + request.insert_header("Content-Type", "text/plain"); exec(request, StatusCode::UnsupportedMediaType) } @@ -396,9 +383,7 @@ mod tests { #[test] fn post_query() { let mut request = Request::new(Method::Post, Url::parse("http://localhost/query").unwrap()); - request - .insert_header("Content-Type", "application/sparql-query") - .unwrap(); + request.insert_header("Content-Type", "application/sparql-query"); request.set_body("SELECT * WHERE { ?s ?p ?o }"); exec(request, StatusCode::Ok) } @@ -406,9 +391,7 @@ mod tests { #[test] fn post_bad_query() { let mut request = Request::new(Method::Post, Url::parse("http://localhost/query").unwrap()); - request - .insert_header("Content-Type", "application/sparql-query") - .unwrap(); + request.insert_header("Content-Type", "application/sparql-query"); request.set_body("SELECT"); exec(request, StatusCode::BadRequest) } @@ -416,9 +399,7 @@ mod tests { #[test] fn post_unknown_query() { let mut request = Request::new(Method::Post, Url::parse("http://localhost/query").unwrap()); - request - .insert_header("Content-Type", "application/sparql-todo") - .unwrap(); + request.insert_header("Content-Type", "application/sparql-todo"); request.set_body("SELECT"); exec(request, StatusCode::UnsupportedMediaType) } diff --git a/wikibase/Cargo.toml b/wikibase/Cargo.toml index 7d1fbebc..7d692eee 100644 --- a/wikibase/Cargo.toml +++ b/wikibase/Cargo.toml @@ -12,11 +12,11 @@ edition = "2018" [dependencies] argh = "0.1" +async-native-tls = "0.3" async-std = { version = "1", features = ["attributes"] } -async-h1 = "1" +async-h1 = "2" chrono = "0.4" -http-client = { version = "2.0", features = ["h1_client"] } -http-types = "1" +http-types = "2" oxigraph = { path = "../lib", features = ["rocksdb"] } serde_json = "1" url = "2" \ No newline at end of file diff --git a/wikibase/src/loader.rs b/wikibase/src/loader.rs index 1a3f4596..f7e029af 100644 --- a/wikibase/src/loader.rs +++ b/wikibase/src/loader.rs @@ -1,10 +1,9 @@ use crate::SERVER; +use async_std::net::TcpStream; use async_std::prelude::*; use async_std::task::block_on; use chrono::{DateTime, Datelike, Utc}; -use http_client::h1::H1Client; -use http_client::HttpClient; -use http_types::{Method, Request, Result}; +use http_types::{headers, Error, Method, Request, Response, Result, StatusCode}; use oxigraph::model::NamedNode; use oxigraph::{GraphSyntax, RocksDbStore}; use serde_json::Value; @@ -18,7 +17,6 @@ pub struct WikibaseLoader { store: RocksDbStore, api_url: Url, entity_data_url: Url, - client: H1Client, namespaces: Vec, slot: Option, frequency: Duration, @@ -38,7 +36,6 @@ impl WikibaseLoader { store, api_url: Url::parse(api_url)?, entity_data_url: Url::parse(&(pages_base_url.to_owned() + "Special:EntityData"))?, - client: H1Client::new(), namespaces: namespaces.to_vec(), slot: slot.map(|t| t.to_owned()), start: Utc::now(), @@ -218,16 +215,49 @@ impl WikibaseLoader { } let url = url.join(&("?".to_owned() + &query_serializer.finish()))?; let mut request = Request::new(Method::Get, url); - request.append_header("user-agent", SERVER)?; - let response = self.client.send(request); + request.append_header(headers::SERVER, SERVER); + block_on(async { - let mut response = response.await?; + let mut response = self.request(request).await?; let mut buffer = Vec::new(); response.read_to_end(&mut buffer).await?; Ok(buffer) }) } + async fn request(&self, request: Request) -> Result { + let addr = request + .url() + .socket_addrs(|| None)? + .into_iter() + .next() + .ok_or_else(|| Error::from_str(StatusCode::BadRequest, "missing valid address"))?; + + Ok(match request.url().scheme() { + "http" => { + let stream = TcpStream::connect(addr).await?; + async_h1::connect(stream, request).await + } + "https" => { + let stream = async_native_tls::connect( + request + .url() + .host_str() + .ok_or_else(|| Error::from_str(StatusCode::BadRequest, "missing host"))?, + TcpStream::connect(addr).await?, + ) + .await?; + async_h1::connect(stream, request).await + } + _ => { + return Err(Error::from_str( + StatusCode::BadRequest, + "missing valid address", + )) + } + }?) + } + fn load_entity_data(&self, uri: &str, data: impl Read) -> Result<()> { let graph_name = NamedNode::parse(uri)?.into(); self.store.transaction(|transaction| { diff --git a/wikibase/src/main.rs b/wikibase/src/main.rs index 7a74d66f..7cdc6c7a 100644 --- a/wikibase/src/main.rs +++ b/wikibase/src/main.rs @@ -15,7 +15,6 @@ use async_std::future::Future; use async_std::net::{TcpListener, TcpStream}; use async_std::prelude::*; use async_std::task::{spawn, spawn_blocking}; -use http_types::headers::HeaderName; use http_types::{headers, Body, Error, Method, Mime, Request, Response, Result, StatusCode}; use oxigraph::sparql::{PreparedQuery, QueryOptions, QueryResult, QueryResultSyntax}; use oxigraph::{FileSyntax, GraphSyntax, RocksDbStore}; @@ -95,7 +94,7 @@ pub async fn main() -> Result<()> { println!("Listening for requests at http://{}", &args.bind); - http_server(args.bind, move |request| { + http_server(&args.bind, move |request| { handle_request(request, store.clone()) }) .await @@ -113,7 +112,7 @@ async fn handle_request(request: Request, store: RocksDbStore) -> Result { if let Some(content_type) = request.content_type() { - if essence(&content_type) == "application/sparql-query" { + if content_type.essence() == "application/sparql-query" { let mut buffer = String::new(); let mut request = request; request @@ -122,7 +121,7 @@ async fn handle_request(request: Request, store: RocksDbStore) -> Result Result Response::new(StatusCode::NotFound), }; - response.append_header("Server", SERVER)?; + response.append_header("Server", SERVER); Ok(response) } -/// TODO: bad hack to overcome http_types limitations -fn essence(mime: &Mime) -> &str { - mime.essence().split(';').next().unwrap_or("") -} - fn simple_response(status: StatusCode, body: impl Into) -> Response { let mut response = Response::new(status); response.set_body(body); @@ -199,7 +193,7 @@ async fn evaluate_sparql_query( )?; let mut response = Response::from(results.write_graph(Vec::default(), format)?); - response.insert_header(headers::CONTENT_TYPE, format.media_type())?; + response.insert_header(headers::CONTENT_TYPE, format.media_type()); Ok(response) } else { let format = content_negotiation( @@ -210,7 +204,7 @@ async fn evaluate_sparql_query( ], )?; let mut response = Response::from(results.write(Vec::default(), format)?); - response.insert_header(headers::CONTENT_TYPE, format.media_type())?; + response.insert_header(headers::CONTENT_TYPE, format.media_type()); Ok(response) } }) @@ -221,15 +215,14 @@ async fn http_server< F: Clone + Send + Sync + 'static + Fn(Request) -> Fut, Fut: Send + Future>, >( - host: String, + host: &str, handle: F, ) -> Result<()> { async fn accept Fut, Fut: Future>>( - addr: String, stream: TcpStream, handle: F, ) -> Result<()> { - async_h1::accept(&addr, stream, |request| async { + async_h1::accept(stream, |request| async { Ok(match handle(request).await { Ok(result) => result, Err(error) => simple_response(error.status(), error.to_string()), @@ -241,11 +234,10 @@ async fn http_server< let listener = TcpListener::bind(&host).await?; let mut incoming = listener.incoming(); while let Some(stream) = incoming.next().await { - let stream = stream?.clone(); //TODO: clone stream? + let stream = stream?; let handle = handle.clone(); - let addr = format!("http://{}", host); spawn(async { - if let Err(err) = accept(addr, stream, handle).await { + if let Err(err) = accept(stream, handle).await { eprintln!("{}", err); }; }); @@ -255,9 +247,8 @@ async fn http_server< fn content_negotiation(request: Request, supported: &[&str]) -> Result { let header = request - .header(&HeaderName::from_str("Accept").unwrap()) - .and_then(|h| h.last()) - .map(|h| h.as_str().trim()) + .header(headers::ACCEPT) + .map(|h| h.last().as_str().trim()) .unwrap_or(""); let supported: Vec = supported .iter() @@ -271,7 +262,7 @@ fn content_negotiation(request: Request, supported: &[&str]) -> R for possible in header.split(',') { let possible = Mime::from_str(possible.trim())?; let score = if let Some(q) = possible.param("q") { - f32::from_str(q)? + f32::from_str(&q.to_string())? } else { 1. }; @@ -290,6 +281,6 @@ fn content_negotiation(request: Request, supported: &[&str]) -> R } } - F::from_mime_type(essence(result)) + F::from_mime_type(result.essence()) .ok_or_else(|| Error::from_str(StatusCode::InternalServerError, "Unknown mime type")) }