From 311fffe71166350ec88aac4249e76644f8cd1c15 Mon Sep 17 00:00:00 2001 From: Tpt Date: Mon, 20 Sep 2021 08:16:07 +0200 Subject: [PATCH] Adds an option to set HTTP timeout --- lib/src/sparql/http/dummy.rs | 3 ++- lib/src/sparql/http/simple.rs | 20 ++++++++-------- lib/src/sparql/mod.rs | 43 +++++++++++++++++++++++++---------- lib/src/sparql/service.rs | 5 ++-- lib/src/sparql/update.rs | 5 ++-- 5 files changed, 49 insertions(+), 27 deletions(-) diff --git a/lib/src/sparql/http/dummy.rs b/lib/src/sparql/http/dummy.rs index 37497456..739cf8e1 100644 --- a/lib/src/sparql/http/dummy.rs +++ b/lib/src/sparql/http/dummy.rs @@ -3,11 +3,12 @@ use crate::error::invalid_input_error; use std::io; use std::io::{BufRead, Empty, Read, Result}; +use std::time::Duration; pub struct Client {} impl Client { - pub fn new() -> Self { + pub fn new(_timeout: Option) -> Self { Self {} } diff --git a/lib/src/sparql/http/simple.rs b/lib/src/sparql/http/simple.rs index aa278d05..bc69628a 100644 --- a/lib/src/sparql/http/simple.rs +++ b/lib/src/sparql/http/simple.rs @@ -1,15 +1,19 @@ use crate::error::{invalid_data_error, invalid_input_error}; use oxhttp::model::{Body, HeaderName, Method, Request}; -use std::io::{Read, Result}; +use std::io::Result; +use std::time::Duration; + +const USER_AGENT: &str = concat!("Oxigraph/", env!("CARGO_PKG_VERSION")); -#[derive(Default)] pub struct Client { client: oxhttp::Client, } impl Client { - pub fn new() -> Self { - Self::default() + pub fn new(timeout: Option) -> Self { + let mut client = oxhttp::Client::new(); + client.set_global_timeout(timeout); + Self { client } } pub fn get(&self, url: &str, accept: &str) -> Result<(String, Body)> { @@ -20,9 +24,7 @@ impl Client { ); request.headers_mut().append( HeaderName::USER_AGENT, - concat!("Oxigraph/", env!("CARGO_PKG_VERSION")) - .parse() - .map_err(invalid_input_error)?, + USER_AGENT.parse().map_err(invalid_input_error)?, ); let response = self.client.request(request)?; let content_type = response @@ -49,9 +51,7 @@ impl Client { ); request.headers_mut().append( HeaderName::USER_AGENT, - concat!("Oxigraph/", env!("CARGO_PKG_VERSION")) - .parse() - .map_err(invalid_input_error)?, + USER_AGENT.parse().map_err(invalid_input_error)?, ); request.headers_mut().append( HeaderName::CONTENT_TYPE, diff --git a/lib/src/sparql/mod.rs b/lib/src/sparql/mod.rs index 41c639f4..0042421e 100644 --- a/lib/src/sparql/mod.rs +++ b/lib/src/sparql/mod.rs @@ -34,7 +34,9 @@ use crate::storage::Storage; pub use spargebra::ParseError; use std::convert::TryInto; use std::rc::Rc; +use std::time::Duration; +#[allow(clippy::needless_pass_by_value)] pub(crate) fn evaluate_query( storage: Storage, query: impl TryInto>, @@ -50,7 +52,7 @@ pub(crate) fn evaluate_query( Ok(SimpleEvaluator::new( Rc::new(dataset), base_iri.map(Rc::new), - options.service_handler, + options.service_handler(), ) .evaluate_select_plan( &plan, @@ -69,7 +71,7 @@ pub(crate) fn evaluate_query( SimpleEvaluator::new( Rc::new(dataset), base_iri.map(Rc::new), - options.service_handler, + options.service_handler(), ) .evaluate_ask_plan(&plan) } @@ -84,7 +86,7 @@ pub(crate) fn evaluate_query( Ok(SimpleEvaluator::new( Rc::new(dataset), base_iri.map(Rc::new), - options.service_handler, + options.service_handler(), ) .evaluate_construct_plan(&plan, construct)) } @@ -95,7 +97,7 @@ pub(crate) fn evaluate_query( Ok(SimpleEvaluator::new( Rc::new(dataset), base_iri.map(Rc::new), - options.service_handler, + options.service_handler(), ) .evaluate_describe_plan(&plan)) } @@ -109,18 +111,16 @@ pub(crate) fn evaluate_query( /// a simple HTTP 1.1 client is used to execute [SPARQL 1.1 Federated Query](https://www.w3.org/TR/sparql11-federated-query/) SERVICE calls. #[derive(Clone)] pub struct QueryOptions { - pub(crate) service_handler: Rc>, + pub(crate) service_handler: Option>>, + http_timeout: Option, } impl Default for QueryOptions { #[inline] fn default() -> Self { Self { - service_handler: if cfg!(feature = "http_client") { - Rc::new(service::SimpleServiceHandler::new()) - } else { - Rc::new(EmptyServiceHandler) - }, + service_handler: None, + http_timeout: None, } } } @@ -129,16 +129,35 @@ impl QueryOptions { /// Use a given [`ServiceHandler`] to execute [SPARQL 1.1 Federated Query](https://www.w3.org/TR/sparql11-federated-query/) SERVICE calls. #[inline] pub fn with_service_handler(mut self, service_handler: impl ServiceHandler + 'static) -> Self { - self.service_handler = Rc::new(ErrorConversionServiceHandler::wrap(service_handler)); + self.service_handler = Some(Rc::new(ErrorConversionServiceHandler::wrap( + service_handler, + ))); self } /// Disables the `SERVICE` calls #[inline] pub fn without_service_handler(mut self) -> Self { - self.service_handler = Rc::new(EmptyServiceHandler); + self.service_handler = Some(Rc::new(EmptyServiceHandler)); + self + } + + /// Sets a timeout for HTTP requests done during SPARQL evaluation + #[cfg(feature = "http_client")] + pub fn with_http_timeout(mut self, timeout: Duration) -> Self { + self.http_timeout = Some(timeout); self } + + fn service_handler(&self) -> Rc> { + self.service_handler.clone().unwrap_or_else(|| { + if cfg!(feature = "http_client") { + Rc::new(service::SimpleServiceHandler::new(self.http_timeout)) + } else { + Rc::new(EmptyServiceHandler) + } + }) + } } /// Options for SPARQL update evaluation diff --git a/lib/src/sparql/service.rs b/lib/src/sparql/service.rs index 539d5af8..1571842a 100644 --- a/lib/src/sparql/service.rs +++ b/lib/src/sparql/service.rs @@ -6,6 +6,7 @@ use crate::sparql::model::QueryResults; use crate::sparql::QueryResultsFormat; use std::error::Error; use std::io::BufReader; +use std::time::Duration; /// Handler for [SPARQL 1.1 Federated Query](https://www.w3.org/TR/sparql11-federated-query/) SERVICE. /// @@ -96,9 +97,9 @@ pub struct SimpleServiceHandler { } impl SimpleServiceHandler { - pub fn new() -> Self { + pub fn new(http_timeout: Option) -> Self { Self { - client: Client::new(), + client: Client::new(http_timeout), } } } diff --git a/lib/src/sparql/update.rs b/lib/src/sparql/update.rs index 9603ff79..cdd0647c 100644 --- a/lib/src/sparql/update.rs +++ b/lib/src/sparql/update.rs @@ -40,11 +40,12 @@ impl<'a> SimpleUpdateEvaluator<'a> { base_iri: Option>>, options: UpdateOptions, ) -> Self { + let client = Client::new(options.query_options.http_timeout); Self { storage, base_iri, options, - client: Client::new(), + client, } } @@ -119,7 +120,7 @@ impl<'a> SimpleUpdateEvaluator<'a> { let evaluator = SimpleEvaluator::new( dataset.clone(), self.base_iri.clone(), - self.options.query_options.service_handler.clone(), + self.options.query_options.service_handler(), ); let mut bnodes = HashMap::new(); for tuple in evaluator.plan_evaluator(&plan)(EncodedTuple::with_capacity(variables.len())) {