Drops usages of async_std::spawn_blocking

pull/70/head
Tpt 4 years ago
parent a228d266c5
commit 7aca89016d
  1. 152
      server/src/main.rs
  2. 2
      testsuite/rdf-tests
  3. 109
      wikibase/src/main.rs

@ -14,7 +14,7 @@ use async_std::future::Future;
use async_std::io::Read; use async_std::io::Read;
use async_std::net::{TcpListener, TcpStream}; use async_std::net::{TcpListener, TcpStream};
use async_std::prelude::*; use async_std::prelude::*;
use async_std::task::{block_on, spawn, spawn_blocking}; use async_std::task::{block_on, spawn};
use http_types::{headers, Body, Error, Method, Mime, Request, Response, Result, StatusCode}; use http_types::{headers, Body, Error, Method, Mime, Request, Response, Result, StatusCode};
use oxigraph::io::{DatasetFormat, GraphFormat}; use oxigraph::io::{DatasetFormat, GraphFormat};
use oxigraph::model::{GraphName, NamedNode, NamedOrBlankNode}; use oxigraph::model::{GraphName, NamedNode, NamedOrBlankNode};
@ -74,31 +74,21 @@ async fn handle_request(request: Request, store: Store) -> Result<Response> {
("/", Method::Post) => { ("/", Method::Post) => {
if let Some(content_type) = request.content_type() { if let Some(content_type) = request.content_type() {
match if let Some(format) = GraphFormat::from_media_type(content_type.essence()) { match if let Some(format) = GraphFormat::from_media_type(content_type.essence()) {
spawn_blocking(move || { store.load_graph(
store.load_graph( BufReader::new(SyncAsyncReader::from(request)),
BufReader::new(SyncAsyncReader::from(request)), format,
format, &GraphName::DefaultGraph,
&GraphName::DefaultGraph, None,
None, )
)
})
} else if let Some(format) = DatasetFormat::from_media_type(content_type.essence()) } else if let Some(format) = DatasetFormat::from_media_type(content_type.essence())
{ {
spawn_blocking(move || { store.load_dataset(BufReader::new(SyncAsyncReader::from(request)), format, None)
store.load_dataset(
BufReader::new(SyncAsyncReader::from(request)),
format,
None,
)
})
} else { } else {
return Ok(simple_response( return Ok(simple_response(
StatusCode::UnsupportedMediaType, StatusCode::UnsupportedMediaType,
format!("No supported content Content-Type given: {}", content_type), format!("No supported content Content-Type given: {}", content_type),
)); ));
} } {
.await
{
Ok(()) => Response::new(StatusCode::NoContent), Ok(()) => Response::new(StatusCode::NoContent),
Err(error) => { Err(error) => {
return Err(bad_request(error)); return Err(bad_request(error));
@ -226,62 +216,59 @@ async fn evaluate_sparql_query(
named_graph_uris: Vec<String>, named_graph_uris: Vec<String>,
request: Request, request: Request,
) -> Result<Response> { ) -> Result<Response> {
spawn_blocking(move || { let mut query = Query::parse(&query, None).map_err(bad_request)?;
let mut query = Query::parse(&query, None).map_err(bad_request)?; let default_graph_uris = default_graph_uris
let default_graph_uris = default_graph_uris .into_iter()
.into_iter() .map(|e| Ok(NamedNode::new(e)?.into()))
.map(|e| Ok(NamedNode::new(e)?.into())) .collect::<Result<Vec<GraphName>>>()
.collect::<Result<Vec<GraphName>>>() .map_err(bad_request)?;
.map_err(bad_request)?; let named_graph_uris = named_graph_uris
let named_graph_uris = named_graph_uris .into_iter()
.into_iter() .map(|e| Ok(NamedNode::new(e)?.into()))
.map(|e| Ok(NamedNode::new(e)?.into())) .collect::<Result<Vec<NamedOrBlankNode>>>()
.collect::<Result<Vec<NamedOrBlankNode>>>() .map_err(bad_request)?;
.map_err(bad_request)?;
if !default_graph_uris.is_empty() || !named_graph_uris.is_empty() {
query.dataset_mut().set_default_graph(default_graph_uris);
query
.dataset_mut()
.set_available_named_graphs(named_graph_uris);
}
let results = store.query(query)?; if !default_graph_uris.is_empty() || !named_graph_uris.is_empty() {
//TODO: stream query.dataset_mut().set_default_graph(default_graph_uris);
if let QueryResults::Graph(_) = results { query
let format = content_negotiation( .dataset_mut()
request, .set_available_named_graphs(named_graph_uris);
&[ }
GraphFormat::NTriples.media_type(),
GraphFormat::Turtle.media_type(), let results = store.query(query)?;
GraphFormat::RdfXml.media_type(), //TODO: stream
], if let QueryResults::Graph(_) = results {
GraphFormat::from_media_type, let format = content_negotiation(
)?; request,
let mut body = Vec::default(); &[
results.write_graph(&mut body, format)?; GraphFormat::NTriples.media_type(),
let mut response = Response::from(body); GraphFormat::Turtle.media_type(),
response.insert_header(headers::CONTENT_TYPE, format.media_type()); GraphFormat::RdfXml.media_type(),
Ok(response) ],
} else { GraphFormat::from_media_type,
let format = content_negotiation( )?;
request, let mut body = Vec::default();
&[ results.write_graph(&mut body, format)?;
QueryResultsFormat::Xml.media_type(), let mut response = Response::from(body);
QueryResultsFormat::Json.media_type(), response.insert_header(headers::CONTENT_TYPE, format.media_type());
QueryResultsFormat::Csv.media_type(), Ok(response)
QueryResultsFormat::Tsv.media_type(), } else {
], let format = content_negotiation(
QueryResultsFormat::from_media_type, request,
)?; &[
let mut body = Vec::default(); QueryResultsFormat::Xml.media_type(),
results.write(&mut body, format)?; QueryResultsFormat::Json.media_type(),
let mut response = Response::from(body); QueryResultsFormat::Csv.media_type(),
response.insert_header(headers::CONTENT_TYPE, format.media_type()); QueryResultsFormat::Tsv.media_type(),
Ok(response) ],
} QueryResultsFormat::from_media_type,
}) )?;
.await let mut body = Vec::default();
results.write(&mut body, format)?;
let mut response = Response::from(body);
response.insert_header(headers::CONTENT_TYPE, format.media_type());
Ok(response)
}
} }
async fn evaluate_urlencoded_sparql_update(store: Store, encoded: Vec<u8>) -> Result<Response> { async fn evaluate_urlencoded_sparql_update(store: Store, encoded: Vec<u8>) -> Result<Response> {
@ -323,16 +310,13 @@ async fn evaluate_sparql_update(
"using-graph-uri and using-named-graph-uri parameters are not supported yet", "using-graph-uri and using-named-graph-uri parameters are not supported yet",
)); ));
} }
spawn_blocking(move || { let update = Update::parse(&update, None).map_err(|e| {
let update = Update::parse(&update, None).map_err(|e| { let mut e = Error::from(e);
let mut e = Error::from(e); e.set_status(StatusCode::BadRequest);
e.set_status(StatusCode::BadRequest); e
e })?;
})?; store.update(update)?;
store.update(update)?; Ok(Response::new(StatusCode::NoContent))
Ok(Response::new(StatusCode::NoContent))
})
.await
} }
async fn http_server< async fn http_server<

@ -1 +1 @@
Subproject commit 56b69dbf6a858e09287743e601cc3ecc0577d05e Subproject commit 5979a875356911ff0175a8f8a6c2d04edecdaac1

@ -14,7 +14,7 @@ use argh::FromArgs;
use async_std::future::Future; use async_std::future::Future;
use async_std::net::{TcpListener, TcpStream}; use async_std::net::{TcpListener, TcpStream};
use async_std::prelude::*; use async_std::prelude::*;
use async_std::task::{spawn, spawn_blocking}; use async_std::task::spawn;
use http_types::{headers, Body, Error, Method, Mime, Request, Response, Result, StatusCode}; use http_types::{headers, Body, Error, Method, Mime, Request, Response, Result, StatusCode};
use oxigraph::io::GraphFormat; use oxigraph::io::GraphFormat;
use oxigraph::sparql::{Query, QueryResults, QueryResultsFormat}; use oxigraph::sparql::{Query, QueryResults, QueryResultsFormat};
@ -79,16 +79,16 @@ pub async fn main() -> Result<()> {
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let slot = args.slot.clone(); let slot = args.slot.clone();
let repo = store.clone(); let repo = store.clone();
spawn_blocking(move || { let mut loader = WikibaseLoader::new(
let mut loader = WikibaseLoader::new( repo,
repo, &mediawiki_api,
&mediawiki_api, &mediawiki_base_url,
&mediawiki_base_url, &namespaces,
&namespaces, slot.as_deref(),
slot.as_deref(), Duration::new(10, 0),
Duration::new(10, 0), )
) .unwrap();
.unwrap(); spawn(async move {
loader.initial_loading().unwrap(); loader.initial_loading().unwrap();
loader.update_loop(); loader.update_loop();
}); });
@ -173,51 +173,48 @@ async fn evaluate_sparql_query(
query: String, query: String,
request: Request, request: Request,
) -> Result<Response> { ) -> Result<Response> {
spawn_blocking(move || { //TODO: stream
//TODO: stream let mut query = Query::parse(&query, None).map_err(|e| {
let mut query = Query::parse(&query, None).map_err(|e| { let mut e = Error::from(e);
let mut e = Error::from(e); e.set_status(StatusCode::BadRequest);
e.set_status(StatusCode::BadRequest); e
e })?;
})?; if query.dataset().is_default_dataset() {
if query.dataset().is_default_dataset() { query.dataset_mut().set_default_graph_as_union();
query.dataset_mut().set_default_graph_as_union(); }
} let results = store.query(query)?;
let results = store.query(query)?; if let QueryResults::Graph(_) = results {
if let QueryResults::Graph(_) = results { let format = content_negotiation(
let format = content_negotiation( request,
request, &[
&[ GraphFormat::NTriples.media_type(),
GraphFormat::NTriples.media_type(), GraphFormat::Turtle.media_type(),
GraphFormat::Turtle.media_type(), GraphFormat::RdfXml.media_type(),
GraphFormat::RdfXml.media_type(), ],
], GraphFormat::from_media_type,
GraphFormat::from_media_type, )?;
)?; let mut body = Vec::default();
let mut body = Vec::default(); results.write_graph(&mut body, format)?;
results.write_graph(&mut body, format)?; let mut response = Response::from(body);
let mut response = Response::from(body); response.insert_header(headers::CONTENT_TYPE, format.media_type());
response.insert_header(headers::CONTENT_TYPE, format.media_type()); Ok(response)
Ok(response) } else {
} else { let format = content_negotiation(
let format = content_negotiation( request,
request, &[
&[ QueryResultsFormat::Xml.media_type(),
QueryResultsFormat::Xml.media_type(), QueryResultsFormat::Json.media_type(),
QueryResultsFormat::Json.media_type(), QueryResultsFormat::Csv.media_type(),
QueryResultsFormat::Csv.media_type(), QueryResultsFormat::Tsv.media_type(),
QueryResultsFormat::Tsv.media_type(), ],
], QueryResultsFormat::from_media_type,
QueryResultsFormat::from_media_type, )?;
)?; let mut body = Vec::default();
let mut body = Vec::default(); results.write(&mut body, format)?;
results.write(&mut body, format)?; let mut response = Response::from(body);
let mut response = Response::from(body); response.insert_header(headers::CONTENT_TYPE, format.media_type());
response.insert_header(headers::CONTENT_TYPE, format.media_type()); Ok(response)
Ok(response) }
}
})
.await
} }
async fn http_server< async fn http_server<

Loading…
Cancel
Save