Implements a Simple HTTP client for SPARQL federation and SPARQL LOAD

pull/58/head
Tpt 4 years ago
parent d121aed47b
commit 23cc09f481
  1. 4
      lib/Cargo.toml
  2. 2
      lib/src/model/blank_node.rs
  3. 2
      lib/src/model/literal.rs
  4. 3
      lib/src/sparql/eval.rs
  5. 19
      lib/src/sparql/http/dummy.rs
  6. 9
      lib/src/sparql/http/mod.rs
  7. 425
      lib/src/sparql/http/simple.rs
  8. 108
      lib/src/sparql/mod.rs
  9. 150
      lib/src/sparql/service.rs
  10. 84
      lib/src/sparql/update.rs
  11. 2
      lib/src/store/binary_encoder.rs
  12. 1
      lib/src/store/memory.rs
  13. 4
      lib/src/store/mod.rs
  14. 1
      lib/src/store/rocksdb.rs
  15. 1
      lib/src/store/sled.rs
  16. 2
      python/Cargo.toml
  17. 1
      python/src/memory_store.rs
  18. 1
      python/src/sled_store.rs
  19. 5
      python/tests/test_store.py
  20. 3
      server/Cargo.toml
  21. 82
      server/src/main.rs
  22. 6
      testsuite/tests/sparql.rs
  23. 2
      wikibase/Cargo.toml
  24. 82
      wikibase/src/main.rs

@ -18,6 +18,7 @@ all-features = true
[features] [features]
default = [] default = []
sophia = ["sophia_api"] sophia = ["sophia_api"]
http_client = ["httparse", "native-tls"]
[dependencies] [dependencies]
rocksdb = { version = "0.15", optional = true } rocksdb = { version = "0.15", optional = true }
@ -40,6 +41,9 @@ peg = "0.6"
siphasher = "0.3" siphasher = "0.3"
lasso = {version="0.3", features=["multi-threaded"]} lasso = {version="0.3", features=["multi-threaded"]}
sophia_api = { version = "0.6.2", optional = true } sophia_api = { version = "0.6.2", optional = true }
http = "0.2"
httparse = { version = "1", optional = true }
native-tls = { version = "0.2", optional = true }
[target.'cfg(target_arch = "wasm32")'.dependencies] [target.'cfg(target_arch = "wasm32")'.dependencies]
js-sys = "0.3" js-sys = "0.3"

@ -351,7 +351,7 @@ impl fmt::Display for BlankNodeIdParseError {
impl Error for BlankNodeIdParseError {} impl Error for BlankNodeIdParseError {}
#[cfg(test)] #[cfg(test)]
mod test { mod tests {
use super::*; use super::*;
#[test] #[test]

@ -603,7 +603,7 @@ impl PartialEq<LiteralRef<'_>> for Literal {
} }
#[cfg(test)] #[cfg(test)]
mod test { mod tests {
use super::*; use super::*;
#[test] #[test]

@ -5,8 +5,9 @@ use crate::model::{BlankNode, LiteralRef, NamedNodeRef};
use crate::sparql::algebra::{DatasetSpec, GraphPattern, QueryVariants}; use crate::sparql::algebra::{DatasetSpec, GraphPattern, QueryVariants};
use crate::sparql::error::EvaluationError; use crate::sparql::error::EvaluationError;
use crate::sparql::model::*; use crate::sparql::model::*;
use crate::sparql::parser::Query;
use crate::sparql::plan::*; use crate::sparql::plan::*;
use crate::sparql::{Query, ServiceHandler}; use crate::sparql::service::ServiceHandler;
use crate::store::numeric_encoder::*; use crate::store::numeric_encoder::*;
use crate::store::small_string::SmallString; use crate::store::small_string::SmallString;
use crate::store::ReadableEncodedStore; use crate::store::ReadableEncodedStore;

@ -0,0 +1,19 @@
//! Simple HTTP client
use crate::error::invalid_input_error;
use http::{Request, Response};
use std::io;
pub struct Client {}
impl Client {
pub fn new() -> Self {
Self {}
}
pub fn request(&self, _request: &Request<Option<Vec<u8>>>) -> io::Result<Response<Vec<u8>>> {
Err(invalid_input_error(
"HTTP client is not available. Enable the feature 'simple_http'",
))
}
}

@ -0,0 +1,9 @@
#[cfg(not(feature = "http_client"))]
mod dummy;
#[cfg(feature = "http_client")]
mod simple;
#[cfg(not(feature = "http_client"))]
pub use dummy::Client;
#[cfg(feature = "http_client")]
pub use simple::Client;

@ -0,0 +1,425 @@
//! Simple HTTP client
use crate::error::{invalid_data_error, invalid_input_error};
use http::header::{CONNECTION, CONTENT_LENGTH, HOST, TRANSFER_ENCODING};
use http::{Request, Response, Version};
use httparse::Status;
use native_tls::TlsConnector;
use std::cmp::min;
use std::convert::TryInto;
use std::io;
use std::io::{BufRead, BufReader, Read, Write};
use std::net::TcpStream;
pub struct Client {}
impl Client {
pub fn new() -> Self {
Self {}
}
pub fn request(
&self,
request: &Request<Option<Vec<u8>>>,
) -> io::Result<Response<Box<dyn BufRead>>> {
let scheme = request
.uri()
.scheme_str()
.ok_or_else(|| invalid_input_error("No host provided"))?;
let port = if let Some(port) = request.uri().port_u16() {
port
} else {
match scheme {
"http" => 80,
"https" => 443,
_ => {
return Err(invalid_input_error(format!(
"No port provided for scheme '{}'",
scheme
)))
}
}
};
let host = request
.uri()
.host()
.ok_or_else(|| invalid_input_error("No host provided"))?;
match scheme {
"http" => {
let mut stream = TcpStream::connect((host, port))?;
self.encode(request, &mut stream)?;
self.decode(stream)
}
"https" => {
let connector =
TlsConnector::new().map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let stream = TcpStream::connect((host, port))?;
let mut stream = connector
.connect(host, stream)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
self.encode(request, &mut stream)?;
self.decode(stream)
}
_ => Err(invalid_input_error(format!(
"Not supported URL scheme: {}",
scheme
))),
}
}
fn encode(
&self,
request: &Request<Option<Vec<u8>>>,
mut writer: &mut impl Write,
) -> io::Result<()> {
if request.headers().contains_key(CONTENT_LENGTH) {
return Err(invalid_input_error(
"content-length header is set by the client library",
));
}
if request.headers().contains_key(HOST) {
return Err(invalid_input_error(
"host header is set by the client library",
));
}
if request.headers().contains_key(CONNECTION) {
return Err(invalid_input_error(
"connection header is set by the client library",
));
}
if let Some(query) = request.uri().query() {
write!(
&mut writer,
"{} {}?{} HTTP/1.1\r\n",
request.method(),
request.uri().path(),
query
)?;
} else {
write!(
&mut writer,
"{} {} HTTP/1.1\r\n",
request.method(),
request.uri().path()
)?;
}
// host
let host = request
.uri()
.host()
.ok_or_else(|| invalid_input_error("No host provided"))?;
if let Some(port) = request.uri().port() {
write!(writer, "host: {}:{}\r\n", request.uri(), port)
} else {
write!(writer, "host: {}\r\n", host)
}?;
// connection
write!(writer, "connection: close\r\n")?;
// headers
for (name, value) in request.headers() {
write!(writer, "{}: ", name.as_str())?;
writer.write_all(value.as_bytes())?;
write!(writer, "\r\n")?;
}
// body with content-length
if let Some(payload) = request.body() {
write!(writer, "content-length: {}\r\n\r\n", payload.len())?;
writer.write_all(payload)?;
} else {
write!(writer, "\r\n")?;
}
Ok(())
}
fn decode<'a>(&self, reader: impl Read + 'a) -> io::Result<Response<Box<dyn BufRead + 'a>>> {
let mut reader = BufReader::new(reader);
// Let's read the headers
let mut buffer = Vec::new();
let mut headers = [httparse::EMPTY_HEADER; 1024];
let mut parsed_response = httparse::Response::new(&mut headers);
loop {
if reader.read_until(b'\n', &mut buffer)? == 0 {
return Err(invalid_data_error("Empty HTTP response"));
}
if buffer.len() > 8 * 1024 {
return Err(invalid_data_error("The headers size should fit in 8kb"));
}
if buffer.ends_with(b"\r\n\r\n") || buffer.ends_with(b"\n\n") {
break; //end of buffer
}
}
if parsed_response
.parse(&buffer)
.map_err(invalid_data_error)?
.is_partial()
{
return Err(invalid_input_error(
"Partial HTTP headers containing two line jumps",
));
}
// Let's build the response
let mut response = Response::builder()
.status(
parsed_response
.code
.ok_or_else(|| invalid_data_error("No status code in the HTTP response"))?,
)
.version(match parsed_response.version {
Some(0) => Version::HTTP_10,
Some(1) => Version::HTTP_11,
Some(id) => {
return Err(invalid_data_error(format!(
"Unsupported HTTP response version: 1.{}",
id
)))
}
None => return Err(invalid_data_error("No HTTP version in the HTTP response")),
});
for header in parsed_response.headers {
response = response.header(header.name, header.value);
}
let content_length = response.headers_ref().and_then(|h| h.get(CONTENT_LENGTH));
let transfer_encoding = response
.headers_ref()
.and_then(|h| h.get(TRANSFER_ENCODING));
if transfer_encoding.is_some() && content_length.is_some() {
return Err(invalid_data_error(
"Transfer-Encoding and Content-Length should not be set at the same time",
));
}
let body: Box<dyn BufRead> = if let Some(content_length) = content_length {
let len = content_length
.to_str()
.map_err(invalid_data_error)?
.parse::<u64>()
.map_err(invalid_data_error)?;
Box::new(reader.take(len))
} else if let Some(transfer_encoding) = transfer_encoding {
let transfer_encoding = transfer_encoding.to_str().map_err(invalid_data_error)?;
if transfer_encoding.eq_ignore_ascii_case("chunked") {
buffer.clear();
Box::new(BufReader::new(ChunkedResponse {
reader,
buffer,
is_start: true,
chunk_position: 1,
chunk_size: 1,
}))
} else {
return Err(invalid_data_error(format!(
"Transfer-Encoding: {} is not supported",
transfer_encoding
)));
}
} else {
Box::new(io::empty())
};
response.body(body).map_err(invalid_data_error)
}
}
struct ChunkedResponse<R: BufRead> {
reader: R,
buffer: Vec<u8>,
is_start: bool,
chunk_position: usize,
chunk_size: usize,
}
impl<R: BufRead> Read for ChunkedResponse<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
loop {
// In case we still have data
if self.chunk_position < self.chunk_size {
let inner_buf = self.reader.fill_buf()?;
let size = min(
min(buf.len(), inner_buf.len()),
self.chunk_size - self.chunk_position,
);
buf[..size].copy_from_slice(&inner_buf[..size]);
self.reader.consume(size);
self.chunk_position += size;
return Ok(size); // Won't be 0 if there is still some inner buffer
}
if self.chunk_size == 0 {
return Ok(0); // We know it's the end
}
if self.is_start {
self.is_start = false;
} else {
// chunk end
self.buffer.clear();
self.reader.read_until(b'\n', &mut self.buffer)?;
if self.buffer != b"\r\n" && self.buffer != b"\n" {
return Err(invalid_data_error("Invalid chunked element end"));
}
}
// We load a new chunk
self.buffer.clear();
self.reader.read_until(b'\n', &mut self.buffer)?;
self.chunk_position = 0;
self.chunk_size = if let Ok(Status::Complete((read, chunk_size))) =
httparse::parse_chunk_size(&self.buffer)
{
if read != self.buffer.len() {
return Err(invalid_data_error("Chuncked header containing a line jump"));
}
chunk_size.try_into().map_err(invalid_data_error)?
} else {
return Err(invalid_data_error("Invalid chuncked header"));
};
if self.chunk_size == 0 {
// we read the trailers
loop {
if self.reader.read_until(b'\n', &mut self.buffer)? == 0 {
return Err(invalid_data_error("Missing chunked encoding end"));
}
if self.buffer.len() > 8 * 1024 {
return Err(invalid_data_error("The trailers size should fit in 8kb"));
}
if self.buffer.ends_with(b"\r\n\r\n") || self.buffer.ends_with(b"\n\n") {
break; //end of buffer
}
}
return Ok(0);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use http::header::{ACCEPT, CONTENT_TYPE};
use http::{Method, StatusCode};
use std::io::Cursor;
use std::str;
#[test]
fn encode_get_request() -> io::Result<()> {
let mut buffer = Vec::new();
Client::new().encode(
&Request::builder()
.method(Method::GET)
.uri("http://example.com/foo/bar?query#fragment")
.header(ACCEPT, "application/json")
.body(None)
.unwrap(),
&mut buffer,
)?;
assert_eq!(
str::from_utf8(&buffer).unwrap(),
"GET /foo/bar?query HTTP/1.1\r\nhost: example.com\r\nconnection: close\r\naccept: application/json\r\n\r\n"
);
Ok(())
}
#[test]
fn encode_post_request() -> io::Result<()> {
let mut buffer = Vec::new();
Client::new().encode(
&Request::builder()
.method(Method::POST)
.uri("http://example.com/foo/bar?query#fragment")
.header(ACCEPT, "application/json")
.body(Some(b"testbody".to_vec()))
.unwrap(),
&mut buffer,
)?;
assert_eq!(
str::from_utf8(&buffer).unwrap(),
"POST /foo/bar?query HTTP/1.1\r\nhost: example.com\r\nconnection: close\r\naccept: application/json\r\ncontent-length: 8\r\n\r\ntestbody"
);
Ok(())
}
#[test]
fn decode_response_without_payload() -> io::Result<()> {
let response = Client::new()
.decode(Cursor::new("HTTP/1.1 404 Not Found\r\n\r\n"))
.unwrap();
assert_eq!(response.status(), StatusCode::NOT_FOUND);
let mut buf = String::new();
response.into_body().read_to_string(&mut buf)?;
assert!(buf.is_empty());
Ok(())
}
#[test]
fn decode_response_with_fixed_payload() -> io::Result<()> {
let response = Client::new().decode(Cursor::new(
"HTTP/1.1 200 OK\r\ncontent-type: text/plain\r\ncontent-length:8\r\n\r\ntestbody",
))?;
assert_eq!(response.status(), StatusCode::OK);
assert_eq!(
response
.headers()
.get(CONTENT_TYPE)
.unwrap()
.to_str()
.unwrap(),
"text/plain"
);
let mut buf = String::new();
response.into_body().read_to_string(&mut buf)?;
assert_eq!(buf, "testbody");
Ok(())
}
#[test]
fn decode_response_with_chunked_payload() -> io::Result<()> {
let response = Client::new().decode(Cursor::new(
"HTTP/1.1 200 OK\r\ncontent-type: text/plain\r\ntransfer-encoding:chunked\r\n\r\n4\r\nWiki\r\n5\r\npedia\r\nE\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n",
))?;
assert_eq!(response.status(), StatusCode::OK);
assert_eq!(
response
.headers()
.get(CONTENT_TYPE)
.unwrap()
.to_str()
.unwrap(),
"text/plain"
);
let mut buf = String::new();
response.into_body().read_to_string(&mut buf)?;
assert_eq!(buf, "Wikipedia in\r\n\r\nchunks.");
Ok(())
}
#[test]
fn decode_response_with_trailer() -> io::Result<()> {
let response = Client::new().decode(Cursor::new(
"HTTP/1.1 200 OK\r\ncontent-type: text/plain\r\ntransfer-encoding:chunked\r\n\r\n4\r\nWiki\r\n5\r\npedia\r\nE\r\n in\r\n\r\nchunks.\r\n0\r\ntest: foo\r\n\r\n",
))?;
assert_eq!(response.status(), StatusCode::OK);
assert_eq!(
response
.headers()
.get(CONTENT_TYPE)
.unwrap()
.to_str()
.unwrap(),
"text/plain"
);
let mut buf = String::new();
response.into_body().read_to_string(&mut buf)?;
assert_eq!(buf, "Wikipedia in\r\n\r\nchunks.");
Ok(())
}
}

@ -7,15 +7,17 @@ mod csv_results;
mod dataset; mod dataset;
mod error; mod error;
mod eval; mod eval;
mod http;
mod json_results; mod json_results;
mod model; mod model;
mod parser; mod parser;
mod plan; mod plan;
mod plan_builder; mod plan_builder;
mod service;
mod update; mod update;
mod xml_results; mod xml_results;
use crate::model::{GraphName, NamedNode, NamedOrBlankNode}; use crate::model::{GraphName, NamedOrBlankNode};
use crate::sparql::algebra::QueryVariants; use crate::sparql::algebra::QueryVariants;
use crate::sparql::dataset::DatasetView; use crate::sparql::dataset::DatasetView;
pub use crate::sparql::error::EvaluationError; pub use crate::sparql::error::EvaluationError;
@ -30,11 +32,15 @@ pub use crate::sparql::parser::ParseError;
pub use crate::sparql::parser::{Query, Update}; pub use crate::sparql::parser::{Query, Update};
use crate::sparql::plan::{PlanNode, TripleTemplate}; use crate::sparql::plan::{PlanNode, TripleTemplate};
use crate::sparql::plan_builder::PlanBuilder; use crate::sparql::plan_builder::PlanBuilder;
pub use crate::sparql::service::ServiceHandler;
use crate::sparql::service::{
EmptyServiceHandler, ErrorConversionServiceHandler, SimpleServiceHandler,
};
use crate::sparql::update::SimpleUpdateEvaluator; use crate::sparql::update::SimpleUpdateEvaluator;
use crate::store::numeric_encoder::{StrContainer, StrEncodingAware}; use crate::store::numeric_encoder::{StrContainer, StrEncodingAware};
use crate::store::{ReadableEncodedStore, WritableEncodedStore}; use crate::store::{ReadableEncodedStore, StoreOrParseError, WritableEncodedStore};
use std::convert::TryInto; use std::convert::TryInto;
use std::error::Error; use std::io;
use std::rc::Rc; use std::rc::Rc;
/// A prepared [SPARQL query](https://www.w3.org/TR/sparql11-query/) /// A prepared [SPARQL query](https://www.w3.org/TR/sparql11-query/)
@ -219,90 +225,21 @@ impl QueryOptions {
self self
} }
/// Use a given [`ServiceHandler`](trait.ServiceHandler.html) to execute [SPARQL 1.1 Federated Query](https://www.w3.org/TR/sparql11-federated-query/) SERVICE calls. /// Use a simple HTTP 1.1 client built into Oxigraph to execute [SPARQL 1.1 Federated Query](https://www.w3.org/TR/sparql11-federated-query/) SERVICE calls.
///
/// Requires the `"http_client"` optional feature.
#[inline] #[inline]
pub fn with_service_handler(mut self, service_handler: impl ServiceHandler + 'static) -> Self { #[cfg(feature = "http_client")]
self.service_handler = Rc::new(ErrorConversionServiceHandler { pub fn with_simple_service_handler(mut self) -> Self {
handler: service_handler, self.service_handler = Rc::new(SimpleServiceHandler::new());
});
self self
} }
}
/// Handler for [SPARQL 1.1 Federated Query](https://www.w3.org/TR/sparql11-federated-query/) SERVICE.
///
/// Should be given to [`QueryOptions`](struct.QueryOptions.html#method.with_service_handler)
/// before evaluating a SPARQL query that uses SERVICE calls.
///
/// ```
/// use oxigraph::MemoryStore;
/// use oxigraph::model::*;
/// use oxigraph::sparql::{QueryOptions, QueryResults, ServiceHandler, Query, EvaluationError};
///
/// #[derive(Default)]
/// struct TestServiceHandler {
/// store: MemoryStore
/// }
///
/// impl ServiceHandler for TestServiceHandler {
/// type Error = EvaluationError;
///
/// fn handle(&self,service_name: NamedNode, query: Query) -> Result<QueryResults,EvaluationError> {
/// if service_name == "http://example.com/service" {
/// self.store.query(query, QueryOptions::default())
/// } else {
/// panic!()
/// }
/// }
/// }
///
/// let store = MemoryStore::new();
/// let service = TestServiceHandler::default();
/// let ex = NamedNode::new("http://example.com")?;
/// service.store.insert(Quad::new(ex.clone(), ex.clone(), ex.clone(), None));
///
/// if let QueryResults::Solutions(mut solutions) = store.query(
/// "SELECT ?s WHERE { SERVICE <http://example.com/service> { ?s ?p ?o } }",
/// QueryOptions::default().with_service_handler(service)
/// )? {
/// assert_eq!(solutions.next().unwrap()?.get("s"), Some(&ex.into()));
/// }
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
pub trait ServiceHandler {
type Error: Error + Send + Sync + 'static;
/// Evaluates a [`Query`](struct.Query.html) against a given service identified by a [`NamedNode`](../model/struct.NamedNode.html).
fn handle(&self, service_name: NamedNode, query: Query) -> Result<QueryResults, Self::Error>;
}
struct EmptyServiceHandler;
impl ServiceHandler for EmptyServiceHandler { /// Use a given [`ServiceHandler`](trait.ServiceHandler.html) to execute [SPARQL 1.1 Federated Query](https://www.w3.org/TR/sparql11-federated-query/) SERVICE calls.
type Error = EvaluationError; #[inline]
pub fn with_service_handler(mut self, service_handler: impl ServiceHandler + 'static) -> Self {
fn handle(&self, _: NamedNode, _: Query) -> Result<QueryResults, EvaluationError> { self.service_handler = Rc::new(ErrorConversionServiceHandler::wrap(service_handler));
Err(EvaluationError::msg( self
"The SERVICE feature is not implemented",
))
}
}
struct ErrorConversionServiceHandler<S: ServiceHandler> {
handler: S,
}
impl<S: ServiceHandler> ServiceHandler for ErrorConversionServiceHandler<S> {
type Error = EvaluationError;
fn handle(
&self,
service_name: NamedNode,
query: Query,
) -> Result<QueryResults, EvaluationError> {
self.handler
.handle(service_name, query)
.map_err(EvaluationError::wrap)
} }
} }
@ -313,7 +250,10 @@ pub(crate) fn evaluate_update<
read: R, read: R,
write: &mut W, write: &mut W,
update: &Update, update: &Update,
) -> Result<(), EvaluationError> { ) -> Result<(), EvaluationError>
where
io::Error: From<StoreOrParseError<W::Error>>,
{
SimpleUpdateEvaluator::new( SimpleUpdateEvaluator::new(
read, read,
write, write,

@ -0,0 +1,150 @@
use crate::error::{invalid_data_error, invalid_input_error};
use crate::model::NamedNode;
use crate::sparql::error::EvaluationError;
use crate::sparql::http::Client;
use crate::sparql::model::QueryResults;
use crate::sparql::parser::Query;
use crate::sparql::QueryResultsFormat;
use http::header::{ACCEPT, CONTENT_TYPE, USER_AGENT};
use http::{Method, Request, StatusCode};
use std::error::Error;
/// Handler for [SPARQL 1.1 Federated Query](https://www.w3.org/TR/sparql11-federated-query/) SERVICE.
///
/// Should be given to [`QueryOptions`](struct.QueryOptions.html#method.with_service_handler)
/// before evaluating a SPARQL query that uses SERVICE calls.
///
/// ```
/// use oxigraph::MemoryStore;
/// use oxigraph::model::*;
/// use oxigraph::sparql::{QueryOptions, QueryResults, ServiceHandler, Query, EvaluationError};
///
/// #[derive(Default)]
/// struct TestServiceHandler {
/// store: MemoryStore
/// }
///
/// impl ServiceHandler for TestServiceHandler {
/// type Error = EvaluationError;
///
/// fn handle(&self,service_name: NamedNode, query: Query) -> Result<QueryResults,EvaluationError> {
/// if service_name == "http://example.com/service" {
/// self.store.query(query, QueryOptions::default())
/// } else {
/// panic!()
/// }
/// }
/// }
///
/// let store = MemoryStore::new();
/// let service = TestServiceHandler::default();
/// let ex = NamedNode::new("http://example.com")?;
/// service.store.insert(Quad::new(ex.clone(), ex.clone(), ex.clone(), None));
///
/// if let QueryResults::Solutions(mut solutions) = store.query(
/// "SELECT ?s WHERE { SERVICE <http://example.com/service> { ?s ?p ?o } }",
/// QueryOptions::default().with_service_handler(service)
/// )? {
/// assert_eq!(solutions.next().unwrap()?.get("s"), Some(&ex.into()));
/// }
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
pub trait ServiceHandler {
type Error: Error + Send + Sync + 'static;
/// Evaluates a [`Query`](struct.Query.html) against a given service identified by a [`NamedNode`](../model/struct.NamedNode.html).
fn handle(&self, service_name: NamedNode, query: Query) -> Result<QueryResults, Self::Error>;
}
pub struct EmptyServiceHandler;
impl ServiceHandler for EmptyServiceHandler {
type Error = EvaluationError;
fn handle(&self, _: NamedNode, _: Query) -> Result<QueryResults, EvaluationError> {
Err(EvaluationError::msg(
"The SERVICE feature is not implemented",
))
}
}
pub struct ErrorConversionServiceHandler<S: ServiceHandler> {
handler: S,
}
impl<S: ServiceHandler> ErrorConversionServiceHandler<S> {
pub fn wrap(handler: S) -> Self {
Self { handler }
}
}
impl<S: ServiceHandler> ServiceHandler for ErrorConversionServiceHandler<S> {
type Error = EvaluationError;
fn handle(
&self,
service_name: NamedNode,
query: Query,
) -> Result<QueryResults, EvaluationError> {
self.handler
.handle(service_name, query)
.map_err(EvaluationError::wrap)
}
}
pub struct SimpleServiceHandler {
client: Client,
}
impl SimpleServiceHandler {
pub fn new() -> Self {
Self {
client: Client::new(),
}
}
}
impl ServiceHandler for SimpleServiceHandler {
type Error = EvaluationError;
fn handle(
&self,
service_name: NamedNode,
query: Query,
) -> Result<QueryResults, EvaluationError> {
let request = Request::builder()
.method(Method::POST)
.uri(service_name.as_str())
.header(CONTENT_TYPE, "application/sparql-query")
.header(ACCEPT, QueryResultsFormat::Xml.media_type())
.header(USER_AGENT, concat!("Oxigraph/", env!("CARGO_PKG_VERSION")))
.body(Some(query.to_string().into_bytes()))
.map_err(invalid_input_error)?;
let response = self.client.request(&request)?;
if response.status() != StatusCode::OK {
return Err(EvaluationError::msg(format!(
"HTTP error code {} returned when querying service {}",
response.status(),
service_name
)));
}
let content_type = response
.headers()
.get(CONTENT_TYPE)
.ok_or_else(|| {
EvaluationError::msg(format!(
"No Content-Type header returned by {}",
service_name
))
})?
.to_str()
.map_err(invalid_data_error)?;
let format = QueryResultsFormat::from_media_type(content_type).ok_or_else(|| {
EvaluationError::msg(format!(
"Unsupported Content-Type returned by {}: {}",
service_name, content_type
))
})?;
Ok(QueryResults::read(response.into_body(), format)?)
}
}

@ -1,19 +1,26 @@
use crate::model::{BlankNode, Term}; use crate::error::{invalid_data_error, invalid_input_error};
use crate::io::GraphFormat;
use crate::model::{BlankNode, GraphNameRef, NamedNode, Term};
use crate::sparql::algebra::{ use crate::sparql::algebra::{
DatasetSpec, GraphPattern, GraphTarget, GraphUpdateOperation, NamedNodeOrVariable, QuadPattern, DatasetSpec, GraphPattern, GraphTarget, GraphUpdateOperation, NamedNodeOrVariable, QuadPattern,
TermOrVariable, TermOrVariable,
}; };
use crate::sparql::dataset::{DatasetStrId, DatasetView}; use crate::sparql::dataset::{DatasetStrId, DatasetView};
use crate::sparql::eval::SimpleEvaluator; use crate::sparql::eval::SimpleEvaluator;
use crate::sparql::http::Client;
use crate::sparql::plan::EncodedTuple; use crate::sparql::plan::EncodedTuple;
use crate::sparql::plan_builder::PlanBuilder; use crate::sparql::plan_builder::PlanBuilder;
use crate::sparql::{EvaluationError, ServiceHandler, Variable}; use crate::sparql::service::ServiceHandler;
use crate::sparql::{EvaluationError, Variable};
use crate::store::numeric_encoder::{ use crate::store::numeric_encoder::{
EncodedQuad, EncodedTerm, ReadEncoder, StrContainer, StrLookup, WriteEncoder, EncodedQuad, EncodedTerm, ReadEncoder, StrContainer, StrLookup, WriteEncoder,
}; };
use crate::store::{ReadableEncodedStore, WritableEncodedStore}; use crate::store::{load_graph, ReadableEncodedStore, StoreOrParseError, WritableEncodedStore};
use http::header::{ACCEPT, CONTENT_TYPE, USER_AGENT};
use http::{Method, Request, StatusCode};
use oxiri::Iri; use oxiri::Iri;
use std::collections::HashMap; use std::collections::HashMap;
use std::io;
use std::rc::Rc; use std::rc::Rc;
pub(crate) struct SimpleUpdateEvaluator<'a, R, W> { pub(crate) struct SimpleUpdateEvaluator<'a, R, W> {
@ -21,6 +28,7 @@ pub(crate) struct SimpleUpdateEvaluator<'a, R, W> {
write: &'a mut W, write: &'a mut W,
base_iri: Option<Rc<Iri<String>>>, base_iri: Option<Rc<Iri<String>>>,
service_handler: Rc<dyn ServiceHandler<Error = EvaluationError>>, service_handler: Rc<dyn ServiceHandler<Error = EvaluationError>>,
client: Client,
} }
impl< impl<
@ -28,6 +36,8 @@ impl<
R: ReadableEncodedStore + Clone + 'static, R: ReadableEncodedStore + Clone + 'static,
W: StrContainer<StrId = R::StrId> + WritableEncodedStore<StrId = R::StrId> + 'a, W: StrContainer<StrId = R::StrId> + WritableEncodedStore<StrId = R::StrId> + 'a,
> SimpleUpdateEvaluator<'a, R, W> > SimpleUpdateEvaluator<'a, R, W>
where
io::Error: From<StoreOrParseError<W::Error>>,
{ {
pub fn new( pub fn new(
read: R, read: R,
@ -40,6 +50,7 @@ impl<
write, write,
base_iri, base_iri,
service_handler, service_handler,
client: Client::new(),
} }
} }
@ -60,9 +71,17 @@ impl<
using, using,
algebra, algebra,
} => self.eval_delete_insert(delete, insert, using, algebra), } => self.eval_delete_insert(delete, insert, using, algebra),
GraphUpdateOperation::Load { .. } => Err(EvaluationError::msg( GraphUpdateOperation::Load { silent, from, to } => {
"SPARQL UPDATE LOAD operation is not implemented yet", if let Err(error) = self.eval_load(from, to) {
)), if *silent {
Ok(())
} else {
Err(error)
}
} else {
Ok(())
}
}
GraphUpdateOperation::Clear { graph, .. } => self.eval_clear(graph), GraphUpdateOperation::Clear { graph, .. } => self.eval_clear(graph),
GraphUpdateOperation::Create { .. } => Ok(()), GraphUpdateOperation::Create { .. } => Ok(()),
GraphUpdateOperation::Drop { graph, .. } => self.eval_clear(graph), GraphUpdateOperation::Drop { graph, .. } => self.eval_clear(graph),
@ -154,6 +173,59 @@ impl<
Ok(()) Ok(())
} }
fn eval_load(
&mut self,
from: &NamedNode,
to: &Option<NamedNode>,
) -> Result<(), EvaluationError> {
let request = Request::builder()
.method(Method::GET)
.uri(from.as_str())
.header(
ACCEPT,
"application/n-triples, text/turtle, application/rdf+xml",
)
.header(USER_AGENT, concat!("Oxigraph/", env!("CARGO_PKG_VERSION")))
.body(None)
.map_err(invalid_input_error)?;
let response = self.client.request(&request)?;
if response.status() != StatusCode::OK {
return Err(EvaluationError::msg(format!(
"HTTP error code {} returned when fetching {}",
response.status(),
from
)));
}
let content_type = response
.headers()
.get(CONTENT_TYPE)
.ok_or_else(|| {
EvaluationError::msg(format!("No Content-Type header returned by {}", from))
})?
.to_str()
.map_err(invalid_data_error)?;
let format = GraphFormat::from_media_type(content_type).ok_or_else(|| {
EvaluationError::msg(format!(
"Unsupported Content-Type returned by {}: {}",
from, content_type
))
})?;
let to_graph_name = if let Some(graph_name) = to {
graph_name.as_ref().into()
} else {
GraphNameRef::DefaultGraph
};
load_graph(
self.write,
response.into_body(),
format,
to_graph_name,
Some(from.as_str()),
)
.map_err(io::Error::from)?;
Ok(())
}
fn eval_clear(&mut self, graph: &GraphTarget) -> Result<(), EvaluationError> { fn eval_clear(&mut self, graph: &GraphTarget) -> Result<(), EvaluationError> {
match graph { match graph {
GraphTarget::NamedNode(graph) => { GraphTarget::NamedNode(graph) => {

@ -656,7 +656,7 @@ pub fn write_term(sink: &mut Vec<u8>, term: EncodedTerm) {
} }
#[cfg(test)] #[cfg(test)]
mod test { mod tests {
use super::*; use super::*;
use crate::store::numeric_encoder::*; use crate::store::numeric_encoder::*;
use std::collections::HashMap; use std::collections::HashMap;

@ -238,7 +238,6 @@ impl MemoryStore {
/// Executes a [SPARQL 1.1 update](https://www.w3.org/TR/sparql11-update/). /// Executes a [SPARQL 1.1 update](https://www.w3.org/TR/sparql11-update/).
/// ///
/// The [`LOAD` operation](https://www.w3.org/TR/sparql11-update/#load) is not supported yet.
/// The store does not track the existence of empty named graphs. /// The store does not track the existence of empty named graphs.
/// This method has no ACID guarantees. /// This method has no ACID guarantees.
/// ///

@ -50,7 +50,7 @@ pub(crate) trait WritableEncodedStore: StrEncodingAware {
fn remove_encoded(&mut self, quad: &EncodedQuad<Self::StrId>) -> Result<(), Self::Error>; fn remove_encoded(&mut self, quad: &EncodedQuad<Self::StrId>) -> Result<(), Self::Error>;
} }
fn load_graph<S: WritableEncodedStore + StrContainer>( pub(crate) fn load_graph<S: WritableEncodedStore + StrContainer>(
store: &mut S, store: &mut S,
reader: impl BufRead, reader: impl BufRead,
format: GraphFormat, format: GraphFormat,
@ -158,7 +158,7 @@ fn dump_dataset(
writer.finish() writer.finish()
} }
enum StoreOrParseError<S> { pub(crate) enum StoreOrParseError<S> {
Store(S), Store(S),
Parse(io::Error), Parse(io::Error),
} }

@ -217,7 +217,6 @@ impl RocksDbStore {
/// Executes a [SPARQL 1.1 update](https://www.w3.org/TR/sparql11-update/). /// Executes a [SPARQL 1.1 update](https://www.w3.org/TR/sparql11-update/).
/// ///
/// The [`LOAD` operation](https://www.w3.org/TR/sparql11-update/#load) is not supported yet.
/// The store does not track the existence of empty named graphs. /// The store does not track the existence of empty named graphs.
/// This method has no ACID guarantees. /// This method has no ACID guarantees.
/// ///

@ -208,7 +208,6 @@ impl SledStore {
/// Executes a [SPARQL 1.1 update](https://www.w3.org/TR/sparql11-update/). /// Executes a [SPARQL 1.1 update](https://www.w3.org/TR/sparql11-update/).
/// ///
/// The [`LOAD` operation](https://www.w3.org/TR/sparql11-update/#load) is not supported yet.
/// The store does not track the existence of empty named graphs. /// The store does not track the existence of empty named graphs.
/// This method has no ACID guarantees. /// This method has no ACID guarantees.
/// ///

@ -15,7 +15,7 @@ name = "pyoxigraph"
doctest = false doctest = false
[dependencies] [dependencies]
oxigraph = {version = "0.1.1", path="../lib", features = ["sled"]} oxigraph = {version = "0.1.1", path="../lib", features = ["sled", "http_client"]}
pyo3 = {version = "0.11", features = ["extension-module"]} pyo3 = {version = "0.11", features = ["extension-module"]}
[package.metadata.maturin] [package.metadata.maturin]

@ -185,7 +185,6 @@ impl PyMemoryStore {
/// :type update: str /// :type update: str
/// :raises SyntaxError: if the provided update is invalid /// :raises SyntaxError: if the provided update is invalid
/// ///
/// The `LOAD operation <https://www.w3.org/TR/sparql11-update/#load>`_ is not supported yet.
/// The store does not track the existence of empty named graphs. /// The store does not track the existence of empty named graphs.
/// This method has no ACID guarantees. /// This method has no ACID guarantees.
/// ///

@ -200,7 +200,6 @@ impl PySledStore {
/// :raises SyntaxError: if the provided update is invalid /// :raises SyntaxError: if the provided update is invalid
/// :raises IOError: if an I/O error happens while reading the store /// :raises IOError: if an I/O error happens while reading the store
/// ///
/// The `LOAD operation <https://www.w3.org/TR/sparql11-update/#load>`_ is not supported yet.
/// The store does not track the existence of empty named graphs. /// The store does not track the existence of empty named graphs.
/// This method has no ACID guarantees. /// This method has no ACID guarantees.
/// ///

@ -165,6 +165,11 @@ class TestAbstractStore(unittest.TestCase, ABC):
store.update('DELETE WHERE { ?v ?v ?v }') store.update('DELETE WHERE { ?v ?v ?v }')
self.assertEqual(len(store), 0) self.assertEqual(len(store), 0)
def test_update_load(self):
store = self.store()
store.update('LOAD <http://www.w3.org/1999/02/22-rdf-syntax-ns>')
self.assertGreater(len(store), 100)
def test_load_ntriples_to_default_graph(self): def test_load_ntriples_to_default_graph(self):
store = self.store() store = self.store()
store.load( store.load(

@ -14,7 +14,6 @@ edition = "2018"
argh = "0.1" argh = "0.1"
async-std = { version = "1", features = ["attributes"] } async-std = { version = "1", features = ["attributes"] }
async-h1 = "2" async-h1 = "2"
http-client = { version = "4", features = ["h1_client"] }
http-types = "2" http-types = "2"
oxigraph = { version = "0.1", path="../lib", features = ["rocksdb"] } oxigraph = { version = "0.1", path="../lib", features = ["rocksdb", "http_client"] }
url = "2" url = "2"

@ -15,19 +15,12 @@ use async_std::io::Read;
use async_std::net::{TcpListener, TcpStream}; use async_std::net::{TcpListener, TcpStream};
use async_std::prelude::*; use async_std::prelude::*;
use async_std::task::{block_on, spawn, spawn_blocking}; use async_std::task::{block_on, spawn, spawn_blocking};
use http_client::h1::H1Client; use http_types::{headers, Body, Error, Method, Mime, Request, Response, Result, StatusCode};
use http_client::HttpClient;
use http_types::{
format_err, headers, Body, Error, Method, Mime, Request, Response, Result, StatusCode, Url,
};
use oxigraph::io::{DatasetFormat, GraphFormat}; use oxigraph::io::{DatasetFormat, GraphFormat};
use oxigraph::model::{GraphName, NamedNode}; use oxigraph::model::{GraphName, NamedNode};
use oxigraph::sparql::{ use oxigraph::sparql::{Query, QueryOptions, QueryResults, QueryResultsFormat, Update};
Query, QueryOptions, QueryResults, QueryResultsFormat, ServiceHandler, Update,
};
use oxigraph::RocksDbStore; use oxigraph::RocksDbStore;
use std::fmt; use std::io::BufReader;
use std::io::{BufReader, Cursor};
use std::str::FromStr; use std::str::FromStr;
use url::form_urlencoded; use url::form_urlencoded;
@ -238,7 +231,7 @@ async fn evaluate_sparql_query(
e e
})?; })?;
let mut options = QueryOptions::default().with_service_handler(HttpService::default()); let mut options = QueryOptions::default().with_simple_service_handler();
for default_graph_uri in default_graph_uris { for default_graph_uri in default_graph_uris {
options = options =
options.with_default_graph(NamedNode::new(default_graph_uri).map_err(|e| { options.with_default_graph(NamedNode::new(default_graph_uri).map_err(|e| {
@ -443,73 +436,6 @@ impl<R: Read + Unpin> std::io::Read for SyncAsyncReader<R> {
//TODO: implement other methods //TODO: implement other methods
} }
#[derive(Default)]
struct HttpService {
client: H1Client,
}
impl ServiceHandler for HttpService {
type Error = HttpServiceError;
fn handle(
&self,
service_name: NamedNode,
query: Query,
) -> std::result::Result<QueryResults, HttpServiceError> {
let mut request = Request::new(
Method::Post,
Url::parse(service_name.as_str()).map_err(Error::from)?,
);
request.append_header(headers::USER_AGENT, SERVER);
request.append_header(headers::CONTENT_TYPE, "application/sparql-query");
request.append_header(headers::ACCEPT, "application/sparql-results+xml");
request.set_body(query.to_string());
//TODO: response streaming
let response: Result<(Option<Mime>, Vec<u8>)> = block_on(async {
let mut response = self.client.send(request).await?;
Ok((response.content_type(), response.body_bytes().await?))
});
let (content_type, data) = response?;
let syntax = if let Some(content_type) = content_type {
QueryResultsFormat::from_media_type(content_type.essence()).ok_or_else(|| {
format_err!(
"Unexpected federated query result type from {}: {}",
service_name,
content_type
)
})?
} else {
QueryResultsFormat::Xml
};
Ok(QueryResults::read(Cursor::new(data), syntax).map_err(Error::from)?)
}
}
#[derive(Debug)]
struct HttpServiceError {
inner: Error,
}
impl fmt::Display for HttpServiceError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.inner.fmt(f)
}
}
impl std::error::Error for HttpServiceError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(self.inner.as_ref())
}
}
impl From<Error> for HttpServiceError {
fn from(inner: Error) -> Self {
Self { inner }
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::handle_request; use crate::handle_request;

@ -125,10 +125,6 @@ fn sparql11_federation_w3c_evaluation_testsuite() -> Result<()> {
fn sparql11_update_w3c_evaluation_testsuite() -> Result<()> { fn sparql11_update_w3c_evaluation_testsuite() -> Result<()> {
run_testsuite( run_testsuite(
"http://www.w3.org/2009/sparql/docs/tests/data-sparql11/manifest-sparql11-update.ttl", "http://www.w3.org/2009/sparql/docs/tests/data-sparql11/manifest-sparql11-update.ttl",
vec![ vec![],
// LOAD is not implemented yet
"http://www.w3.org/2009/sparql/docs/tests/data-sparql11/update-silent/manifest#load-into-silent",
"http://www.w3.org/2009/sparql/docs/tests/data-sparql11/update-silent/manifest#load-silent"
]
) )
} }

@ -17,6 +17,6 @@ async-h1 = "2"
chrono = "0.4" chrono = "0.4"
http-client = { version = "4", features = ["h1_client"] } http-client = { version = "4", features = ["h1_client"] }
http-types = "2" http-types = "2"
oxigraph = { version = "0.1", path ="../lib", features = ["rocksdb"] } oxigraph = { version = "0.1", path ="../lib", features = ["rocksdb", "http_client"] }
serde_json = "1" serde_json = "1"
url = "2" url = "2"

@ -14,18 +14,11 @@ use argh::FromArgs;
use async_std::future::Future; use async_std::future::Future;
use async_std::net::{TcpListener, TcpStream}; use async_std::net::{TcpListener, TcpStream};
use async_std::prelude::*; use async_std::prelude::*;
use async_std::task::{block_on, spawn, spawn_blocking}; use async_std::task::{spawn, spawn_blocking};
use http_client::h1::H1Client; use http_types::{headers, Body, Error, Method, Mime, Request, Response, Result, StatusCode};
use http_client::HttpClient;
use http_types::{
format_err, headers, Body, Error, Method, Mime, Request, Response, Result, StatusCode, Url,
};
use oxigraph::io::GraphFormat; use oxigraph::io::GraphFormat;
use oxigraph::model::NamedNode; use oxigraph::sparql::{Query, QueryOptions, QueryResults, QueryResultsFormat};
use oxigraph::sparql::{Query, QueryOptions, QueryResults, QueryResultsFormat, ServiceHandler};
use oxigraph::RocksDbStore; use oxigraph::RocksDbStore;
use std::fmt;
use std::io::Cursor;
use std::str::FromStr; use std::str::FromStr;
use std::time::Duration; use std::time::Duration;
use url::form_urlencoded; use url::form_urlencoded;
@ -189,7 +182,7 @@ async fn evaluate_sparql_query(
})?; })?;
let options = QueryOptions::default() let options = QueryOptions::default()
.with_default_graph_as_union() .with_default_graph_as_union()
.with_service_handler(HttpService::default()); .with_simple_service_handler();
let results = store.query(query, options)?; let results = store.query(query, options)?;
if let QueryResults::Graph(_) = results { if let QueryResults::Graph(_) = results {
let format = content_negotiation( let format = content_negotiation(
@ -304,70 +297,3 @@ fn content_negotiation<F>(
parse(result.essence()) parse(result.essence())
.ok_or_else(|| Error::from_str(StatusCode::InternalServerError, "Unknown mime type")) .ok_or_else(|| Error::from_str(StatusCode::InternalServerError, "Unknown mime type"))
} }
#[derive(Default)]
struct HttpService {
client: H1Client,
}
impl ServiceHandler for HttpService {
type Error = HttpServiceError;
fn handle(
&self,
service_name: NamedNode,
query: Query,
) -> std::result::Result<QueryResults, HttpServiceError> {
let mut request = Request::new(
Method::Post,
Url::parse(service_name.as_str()).map_err(Error::from)?,
);
request.append_header(headers::USER_AGENT, SERVER);
request.append_header(headers::CONTENT_TYPE, "application/sparql-query");
request.append_header(headers::ACCEPT, "application/sparql-results+xml");
request.set_body(query.to_string());
//TODO: response streaming
let response: Result<(Option<Mime>, Vec<u8>)> = block_on(async {
let mut response = self.client.send(request).await?;
Ok((response.content_type(), response.body_bytes().await?))
});
let (content_type, data) = response?;
let syntax = if let Some(content_type) = content_type {
QueryResultsFormat::from_media_type(content_type.essence()).ok_or_else(|| {
format_err!(
"Unexpected federated query result type from {}: {}",
service_name,
content_type
)
})?
} else {
QueryResultsFormat::Xml
};
Ok(QueryResults::read(Cursor::new(data), syntax).map_err(Error::from)?)
}
}
#[derive(Debug)]
struct HttpServiceError {
inner: Error,
}
impl fmt::Display for HttpServiceError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.inner.fmt(f)
}
}
impl std::error::Error for HttpServiceError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(self.inner.as_ref())
}
}
impl From<Error> for HttpServiceError {
fn from(inner: Error) -> Self {
Self { inner }
}
}

Loading…
Cancel
Save