From 7aca89016db6cff382680d72a53746291ceb9318 Mon Sep 17 00:00:00 2001 From: Tpt Date: Tue, 22 Dec 2020 14:45:37 +0100 Subject: [PATCH] Drops usages of async_std::spawn_blocking --- server/src/main.rs | 152 +++++++++++++++++++------------------------ testsuite/rdf-tests | 2 +- wikibase/src/main.rs | 109 +++++++++++++++---------------- 3 files changed, 122 insertions(+), 141 deletions(-) diff --git a/server/src/main.rs b/server/src/main.rs index ee89e808..92a97a36 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -14,7 +14,7 @@ use async_std::future::Future; use async_std::io::Read; use async_std::net::{TcpListener, TcpStream}; use async_std::prelude::*; -use async_std::task::{block_on, spawn, spawn_blocking}; +use async_std::task::{block_on, spawn}; use http_types::{headers, Body, Error, Method, Mime, Request, Response, Result, StatusCode}; use oxigraph::io::{DatasetFormat, GraphFormat}; use oxigraph::model::{GraphName, NamedNode, NamedOrBlankNode}; @@ -74,31 +74,21 @@ async fn handle_request(request: Request, store: Store) -> Result { ("/", Method::Post) => { if let Some(content_type) = request.content_type() { match if let Some(format) = GraphFormat::from_media_type(content_type.essence()) { - spawn_blocking(move || { - store.load_graph( - BufReader::new(SyncAsyncReader::from(request)), - format, - &GraphName::DefaultGraph, - None, - ) - }) + store.load_graph( + BufReader::new(SyncAsyncReader::from(request)), + format, + &GraphName::DefaultGraph, + None, + ) } else if let Some(format) = DatasetFormat::from_media_type(content_type.essence()) { - spawn_blocking(move || { - store.load_dataset( - BufReader::new(SyncAsyncReader::from(request)), - format, - None, - ) - }) + store.load_dataset(BufReader::new(SyncAsyncReader::from(request)), format, None) } else { return Ok(simple_response( StatusCode::UnsupportedMediaType, format!("No supported content Content-Type given: {}", content_type), )); - } - .await - { + } { Ok(()) => Response::new(StatusCode::NoContent), Err(error) => { return Err(bad_request(error)); @@ -226,62 +216,59 @@ async fn evaluate_sparql_query( named_graph_uris: Vec, request: Request, ) -> Result { - spawn_blocking(move || { - let mut query = Query::parse(&query, None).map_err(bad_request)?; - let default_graph_uris = default_graph_uris - .into_iter() - .map(|e| Ok(NamedNode::new(e)?.into())) - .collect::>>() - .map_err(bad_request)?; - let named_graph_uris = named_graph_uris - .into_iter() - .map(|e| Ok(NamedNode::new(e)?.into())) - .collect::>>() - .map_err(bad_request)?; - - if !default_graph_uris.is_empty() || !named_graph_uris.is_empty() { - query.dataset_mut().set_default_graph(default_graph_uris); - query - .dataset_mut() - .set_available_named_graphs(named_graph_uris); - } + let mut query = Query::parse(&query, None).map_err(bad_request)?; + let default_graph_uris = default_graph_uris + .into_iter() + .map(|e| Ok(NamedNode::new(e)?.into())) + .collect::>>() + .map_err(bad_request)?; + let named_graph_uris = named_graph_uris + .into_iter() + .map(|e| Ok(NamedNode::new(e)?.into())) + .collect::>>() + .map_err(bad_request)?; - let results = store.query(query)?; - //TODO: stream - if let QueryResults::Graph(_) = results { - let format = content_negotiation( - request, - &[ - GraphFormat::NTriples.media_type(), - GraphFormat::Turtle.media_type(), - GraphFormat::RdfXml.media_type(), - ], - GraphFormat::from_media_type, - )?; - let mut body = Vec::default(); - results.write_graph(&mut body, format)?; - let mut response = Response::from(body); - response.insert_header(headers::CONTENT_TYPE, format.media_type()); - Ok(response) - } else { - let format = content_negotiation( - request, - &[ - QueryResultsFormat::Xml.media_type(), - QueryResultsFormat::Json.media_type(), - QueryResultsFormat::Csv.media_type(), - QueryResultsFormat::Tsv.media_type(), - ], - QueryResultsFormat::from_media_type, - )?; - let mut body = Vec::default(); - results.write(&mut body, format)?; - let mut response = Response::from(body); - response.insert_header(headers::CONTENT_TYPE, format.media_type()); - Ok(response) - } - }) - .await + if !default_graph_uris.is_empty() || !named_graph_uris.is_empty() { + query.dataset_mut().set_default_graph(default_graph_uris); + query + .dataset_mut() + .set_available_named_graphs(named_graph_uris); + } + + let results = store.query(query)?; + //TODO: stream + if let QueryResults::Graph(_) = results { + let format = content_negotiation( + request, + &[ + GraphFormat::NTriples.media_type(), + GraphFormat::Turtle.media_type(), + GraphFormat::RdfXml.media_type(), + ], + GraphFormat::from_media_type, + )?; + let mut body = Vec::default(); + results.write_graph(&mut body, format)?; + let mut response = Response::from(body); + response.insert_header(headers::CONTENT_TYPE, format.media_type()); + Ok(response) + } else { + let format = content_negotiation( + request, + &[ + QueryResultsFormat::Xml.media_type(), + QueryResultsFormat::Json.media_type(), + QueryResultsFormat::Csv.media_type(), + QueryResultsFormat::Tsv.media_type(), + ], + QueryResultsFormat::from_media_type, + )?; + let mut body = Vec::default(); + results.write(&mut body, format)?; + let mut response = Response::from(body); + response.insert_header(headers::CONTENT_TYPE, format.media_type()); + Ok(response) + } } async fn evaluate_urlencoded_sparql_update(store: Store, encoded: Vec) -> Result { @@ -323,16 +310,13 @@ async fn evaluate_sparql_update( "using-graph-uri and using-named-graph-uri parameters are not supported yet", )); } - spawn_blocking(move || { - let update = Update::parse(&update, None).map_err(|e| { - let mut e = Error::from(e); - e.set_status(StatusCode::BadRequest); - e - })?; - store.update(update)?; - Ok(Response::new(StatusCode::NoContent)) - }) - .await + let update = Update::parse(&update, None).map_err(|e| { + let mut e = Error::from(e); + e.set_status(StatusCode::BadRequest); + e + })?; + store.update(update)?; + Ok(Response::new(StatusCode::NoContent)) } async fn http_server< diff --git a/testsuite/rdf-tests b/testsuite/rdf-tests index 56b69dbf..5979a875 160000 --- a/testsuite/rdf-tests +++ b/testsuite/rdf-tests @@ -1 +1 @@ -Subproject commit 56b69dbf6a858e09287743e601cc3ecc0577d05e +Subproject commit 5979a875356911ff0175a8f8a6c2d04edecdaac1 diff --git a/wikibase/src/main.rs b/wikibase/src/main.rs index f74a8407..4225f48b 100644 --- a/wikibase/src/main.rs +++ b/wikibase/src/main.rs @@ -14,7 +14,7 @@ use argh::FromArgs; use async_std::future::Future; use async_std::net::{TcpListener, TcpStream}; use async_std::prelude::*; -use async_std::task::{spawn, spawn_blocking}; +use async_std::task::spawn; use http_types::{headers, Body, Error, Method, Mime, Request, Response, Result, StatusCode}; use oxigraph::io::GraphFormat; use oxigraph::sparql::{Query, QueryResults, QueryResultsFormat}; @@ -79,16 +79,16 @@ pub async fn main() -> Result<()> { .collect::>(); let slot = args.slot.clone(); let repo = store.clone(); - spawn_blocking(move || { - let mut loader = WikibaseLoader::new( - repo, - &mediawiki_api, - &mediawiki_base_url, - &namespaces, - slot.as_deref(), - Duration::new(10, 0), - ) - .unwrap(); + let mut loader = WikibaseLoader::new( + repo, + &mediawiki_api, + &mediawiki_base_url, + &namespaces, + slot.as_deref(), + Duration::new(10, 0), + ) + .unwrap(); + spawn(async move { loader.initial_loading().unwrap(); loader.update_loop(); }); @@ -173,51 +173,48 @@ async fn evaluate_sparql_query( query: String, request: Request, ) -> Result { - spawn_blocking(move || { - //TODO: stream - let mut query = Query::parse(&query, None).map_err(|e| { - let mut e = Error::from(e); - e.set_status(StatusCode::BadRequest); - e - })?; - if query.dataset().is_default_dataset() { - query.dataset_mut().set_default_graph_as_union(); - } - let results = store.query(query)?; - if let QueryResults::Graph(_) = results { - let format = content_negotiation( - request, - &[ - GraphFormat::NTriples.media_type(), - GraphFormat::Turtle.media_type(), - GraphFormat::RdfXml.media_type(), - ], - GraphFormat::from_media_type, - )?; - let mut body = Vec::default(); - results.write_graph(&mut body, format)?; - let mut response = Response::from(body); - response.insert_header(headers::CONTENT_TYPE, format.media_type()); - Ok(response) - } else { - let format = content_negotiation( - request, - &[ - QueryResultsFormat::Xml.media_type(), - QueryResultsFormat::Json.media_type(), - QueryResultsFormat::Csv.media_type(), - QueryResultsFormat::Tsv.media_type(), - ], - QueryResultsFormat::from_media_type, - )?; - let mut body = Vec::default(); - results.write(&mut body, format)?; - let mut response = Response::from(body); - response.insert_header(headers::CONTENT_TYPE, format.media_type()); - Ok(response) - } - }) - .await + //TODO: stream + let mut query = Query::parse(&query, None).map_err(|e| { + let mut e = Error::from(e); + e.set_status(StatusCode::BadRequest); + e + })?; + if query.dataset().is_default_dataset() { + query.dataset_mut().set_default_graph_as_union(); + } + let results = store.query(query)?; + if let QueryResults::Graph(_) = results { + let format = content_negotiation( + request, + &[ + GraphFormat::NTriples.media_type(), + GraphFormat::Turtle.media_type(), + GraphFormat::RdfXml.media_type(), + ], + GraphFormat::from_media_type, + )?; + let mut body = Vec::default(); + results.write_graph(&mut body, format)?; + let mut response = Response::from(body); + response.insert_header(headers::CONTENT_TYPE, format.media_type()); + Ok(response) + } else { + let format = content_negotiation( + request, + &[ + QueryResultsFormat::Xml.media_type(), + QueryResultsFormat::Json.media_type(), + QueryResultsFormat::Csv.media_type(), + QueryResultsFormat::Tsv.media_type(), + ], + QueryResultsFormat::from_media_type, + )?; + let mut body = Vec::default(); + results.write(&mut body, format)?; + let mut response = Response::from(body); + response.insert_header(headers::CONTENT_TYPE, format.media_type()); + Ok(response) + } } async fn http_server<