Adds an option to set HTTP timeout

pull/171/head
Tpt 3 years ago
parent d722edd4af
commit 311fffe711
  1. 3
      lib/src/sparql/http/dummy.rs
  2. 20
      lib/src/sparql/http/simple.rs
  3. 43
      lib/src/sparql/mod.rs
  4. 5
      lib/src/sparql/service.rs
  5. 5
      lib/src/sparql/update.rs

@ -3,11 +3,12 @@
use crate::error::invalid_input_error; use crate::error::invalid_input_error;
use std::io; use std::io;
use std::io::{BufRead, Empty, Read, Result}; use std::io::{BufRead, Empty, Read, Result};
use std::time::Duration;
pub struct Client {} pub struct Client {}
impl Client { impl Client {
pub fn new() -> Self { pub fn new(_timeout: Option<Duration>) -> Self {
Self {} Self {}
} }

@ -1,15 +1,19 @@
use crate::error::{invalid_data_error, invalid_input_error}; use crate::error::{invalid_data_error, invalid_input_error};
use oxhttp::model::{Body, HeaderName, Method, Request}; use oxhttp::model::{Body, HeaderName, Method, Request};
use std::io::{Read, Result}; use std::io::Result;
use std::time::Duration;
const USER_AGENT: &str = concat!("Oxigraph/", env!("CARGO_PKG_VERSION"));
#[derive(Default)]
pub struct Client { pub struct Client {
client: oxhttp::Client, client: oxhttp::Client,
} }
impl Client { impl Client {
pub fn new() -> Self { pub fn new(timeout: Option<Duration>) -> Self {
Self::default() let mut client = oxhttp::Client::new();
client.set_global_timeout(timeout);
Self { client }
} }
pub fn get(&self, url: &str, accept: &str) -> Result<(String, Body)> { pub fn get(&self, url: &str, accept: &str) -> Result<(String, Body)> {
@ -20,9 +24,7 @@ impl Client {
); );
request.headers_mut().append( request.headers_mut().append(
HeaderName::USER_AGENT, HeaderName::USER_AGENT,
concat!("Oxigraph/", env!("CARGO_PKG_VERSION")) USER_AGENT.parse().map_err(invalid_input_error)?,
.parse()
.map_err(invalid_input_error)?,
); );
let response = self.client.request(request)?; let response = self.client.request(request)?;
let content_type = response let content_type = response
@ -49,9 +51,7 @@ impl Client {
); );
request.headers_mut().append( request.headers_mut().append(
HeaderName::USER_AGENT, HeaderName::USER_AGENT,
concat!("Oxigraph/", env!("CARGO_PKG_VERSION")) USER_AGENT.parse().map_err(invalid_input_error)?,
.parse()
.map_err(invalid_input_error)?,
); );
request.headers_mut().append( request.headers_mut().append(
HeaderName::CONTENT_TYPE, HeaderName::CONTENT_TYPE,

@ -34,7 +34,9 @@ use crate::storage::Storage;
pub use spargebra::ParseError; pub use spargebra::ParseError;
use std::convert::TryInto; use std::convert::TryInto;
use std::rc::Rc; use std::rc::Rc;
use std::time::Duration;
#[allow(clippy::needless_pass_by_value)]
pub(crate) fn evaluate_query( pub(crate) fn evaluate_query(
storage: Storage, storage: Storage,
query: impl TryInto<Query, Error = impl Into<EvaluationError>>, query: impl TryInto<Query, Error = impl Into<EvaluationError>>,
@ -50,7 +52,7 @@ pub(crate) fn evaluate_query(
Ok(SimpleEvaluator::new( Ok(SimpleEvaluator::new(
Rc::new(dataset), Rc::new(dataset),
base_iri.map(Rc::new), base_iri.map(Rc::new),
options.service_handler, options.service_handler(),
) )
.evaluate_select_plan( .evaluate_select_plan(
&plan, &plan,
@ -69,7 +71,7 @@ pub(crate) fn evaluate_query(
SimpleEvaluator::new( SimpleEvaluator::new(
Rc::new(dataset), Rc::new(dataset),
base_iri.map(Rc::new), base_iri.map(Rc::new),
options.service_handler, options.service_handler(),
) )
.evaluate_ask_plan(&plan) .evaluate_ask_plan(&plan)
} }
@ -84,7 +86,7 @@ pub(crate) fn evaluate_query(
Ok(SimpleEvaluator::new( Ok(SimpleEvaluator::new(
Rc::new(dataset), Rc::new(dataset),
base_iri.map(Rc::new), base_iri.map(Rc::new),
options.service_handler, options.service_handler(),
) )
.evaluate_construct_plan(&plan, construct)) .evaluate_construct_plan(&plan, construct))
} }
@ -95,7 +97,7 @@ pub(crate) fn evaluate_query(
Ok(SimpleEvaluator::new( Ok(SimpleEvaluator::new(
Rc::new(dataset), Rc::new(dataset),
base_iri.map(Rc::new), base_iri.map(Rc::new),
options.service_handler, options.service_handler(),
) )
.evaluate_describe_plan(&plan)) .evaluate_describe_plan(&plan))
} }
@ -109,18 +111,16 @@ pub(crate) fn evaluate_query(
/// a simple HTTP 1.1 client is used to execute [SPARQL 1.1 Federated Query](https://www.w3.org/TR/sparql11-federated-query/) SERVICE calls. /// a simple HTTP 1.1 client is used to execute [SPARQL 1.1 Federated Query](https://www.w3.org/TR/sparql11-federated-query/) SERVICE calls.
#[derive(Clone)] #[derive(Clone)]
pub struct QueryOptions { pub struct QueryOptions {
pub(crate) service_handler: Rc<dyn ServiceHandler<Error = EvaluationError>>, pub(crate) service_handler: Option<Rc<dyn ServiceHandler<Error = EvaluationError>>>,
http_timeout: Option<Duration>,
} }
impl Default for QueryOptions { impl Default for QueryOptions {
#[inline] #[inline]
fn default() -> Self { fn default() -> Self {
Self { Self {
service_handler: if cfg!(feature = "http_client") { service_handler: None,
Rc::new(service::SimpleServiceHandler::new()) http_timeout: None,
} else {
Rc::new(EmptyServiceHandler)
},
} }
} }
} }
@ -129,16 +129,35 @@ impl QueryOptions {
/// Use a given [`ServiceHandler`] to execute [SPARQL 1.1 Federated Query](https://www.w3.org/TR/sparql11-federated-query/) SERVICE calls. /// Use a given [`ServiceHandler`] to execute [SPARQL 1.1 Federated Query](https://www.w3.org/TR/sparql11-federated-query/) SERVICE calls.
#[inline] #[inline]
pub fn with_service_handler(mut self, service_handler: impl ServiceHandler + 'static) -> Self { pub fn with_service_handler(mut self, service_handler: impl ServiceHandler + 'static) -> Self {
self.service_handler = Rc::new(ErrorConversionServiceHandler::wrap(service_handler)); self.service_handler = Some(Rc::new(ErrorConversionServiceHandler::wrap(
service_handler,
)));
self self
} }
/// Disables the `SERVICE` calls /// Disables the `SERVICE` calls
#[inline] #[inline]
pub fn without_service_handler(mut self) -> Self { pub fn without_service_handler(mut self) -> Self {
self.service_handler = Rc::new(EmptyServiceHandler); self.service_handler = Some(Rc::new(EmptyServiceHandler));
self
}
/// Sets a timeout for HTTP requests done during SPARQL evaluation
#[cfg(feature = "http_client")]
pub fn with_http_timeout(mut self, timeout: Duration) -> Self {
self.http_timeout = Some(timeout);
self self
} }
fn service_handler(&self) -> Rc<dyn ServiceHandler<Error = EvaluationError>> {
self.service_handler.clone().unwrap_or_else(|| {
if cfg!(feature = "http_client") {
Rc::new(service::SimpleServiceHandler::new(self.http_timeout))
} else {
Rc::new(EmptyServiceHandler)
}
})
}
} }
/// Options for SPARQL update evaluation /// Options for SPARQL update evaluation

@ -6,6 +6,7 @@ use crate::sparql::model::QueryResults;
use crate::sparql::QueryResultsFormat; use crate::sparql::QueryResultsFormat;
use std::error::Error; use std::error::Error;
use std::io::BufReader; use std::io::BufReader;
use std::time::Duration;
/// Handler for [SPARQL 1.1 Federated Query](https://www.w3.org/TR/sparql11-federated-query/) SERVICE. /// Handler for [SPARQL 1.1 Federated Query](https://www.w3.org/TR/sparql11-federated-query/) SERVICE.
/// ///
@ -96,9 +97,9 @@ pub struct SimpleServiceHandler {
} }
impl SimpleServiceHandler { impl SimpleServiceHandler {
pub fn new() -> Self { pub fn new(http_timeout: Option<Duration>) -> Self {
Self { Self {
client: Client::new(), client: Client::new(http_timeout),
} }
} }
} }

@ -40,11 +40,12 @@ impl<'a> SimpleUpdateEvaluator<'a> {
base_iri: Option<Rc<Iri<String>>>, base_iri: Option<Rc<Iri<String>>>,
options: UpdateOptions, options: UpdateOptions,
) -> Self { ) -> Self {
let client = Client::new(options.query_options.http_timeout);
Self { Self {
storage, storage,
base_iri, base_iri,
options, options,
client: Client::new(), client,
} }
} }
@ -119,7 +120,7 @@ impl<'a> SimpleUpdateEvaluator<'a> {
let evaluator = SimpleEvaluator::new( let evaluator = SimpleEvaluator::new(
dataset.clone(), dataset.clone(),
self.base_iri.clone(), self.base_iri.clone(),
self.options.query_options.service_handler.clone(), self.options.query_options.service_handler(),
); );
let mut bnodes = HashMap::new(); let mut bnodes = HashMap::new();
for tuple in evaluator.plan_evaluator(&plan)(EncodedTuple::with_capacity(variables.len())) { for tuple in evaluator.plan_evaluator(&plan)(EncodedTuple::with_capacity(variables.len())) {

Loading…
Cancel
Save