From 240e09c3ace085cd8a604134f80925b108c72375 Mon Sep 17 00:00:00 2001 From: Tpt Date: Sun, 26 Jul 2020 22:19:08 +0200 Subject: [PATCH] Adds federated queries to the standalone server --- server/Cargo.toml | 1 + server/Dockerfile | 1 + server/src/main.rs | 116 +++++++++++++++++++++++++++++++---------- wikibase/Cargo.toml | 2 +- wikibase/src/loader.rs | 44 +++------------- 5 files changed, 99 insertions(+), 65 deletions(-) diff --git a/server/Cargo.toml b/server/Cargo.toml index a5d47cf8..a37e0f92 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -14,6 +14,7 @@ edition = "2018" argh = "0.1" async-std = { version = "1", features = ["attributes"] } async-h1 = "2" +http-client = { version = "4", features = ["h1_client"] } http-types = "2" oxigraph = { path = "../lib", features = ["rocksdb"] } url = "2" diff --git a/server/Dockerfile b/server/Dockerfile index fa74d98f..fbf531fd 100644 --- a/server/Dockerfile +++ b/server/Dockerfile @@ -6,6 +6,7 @@ RUN cd /oxigraph/server && cargo build --release FROM debian:buster-slim +RUN apt-get update && apt-get install ca-certificates -y && rm -rf /var/lib/apt/lists/* COPY --from=builder /oxigraph/target/release/oxigraph_server /usr/local/bin/oxigraph_server ENTRYPOINT [ "/usr/local/bin/oxigraph_server" ] diff --git a/server/src/main.rs b/server/src/main.rs index 8fca4ade..858b8577 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -11,15 +11,21 @@ use argh::FromArgs; use async_std::future::Future; -use async_std::io::{BufRead, Read}; +use async_std::io::Read; use async_std::net::{TcpListener, TcpStream}; use async_std::prelude::*; use async_std::task::{block_on, spawn, spawn_blocking}; -use http_types::{headers, Body, Error, Method, Mime, Request, Response, Result, StatusCode}; +use http_client::h1::H1Client; +use http_client::HttpClient; +use http_types::{ + format_err, headers, Body, Error, Method, Mime, Request, Response, Result, StatusCode, Url, +}; use oxigraph::io::{DatasetFormat, GraphFormat}; -use oxigraph::model::GraphName; -use oxigraph::sparql::{Query, QueryOptions, QueryResult, QueryResultFormat}; +use oxigraph::model::{GraphName, NamedNode}; +use oxigraph::sparql::{Query, QueryOptions, QueryResult, QueryResultFormat, ServiceHandler}; use oxigraph::RocksDbStore; +use std::fmt; +use std::io::BufReader; use std::str::FromStr; use url::form_urlencoded; @@ -64,7 +70,7 @@ async fn handle_request(request: Request, store: RocksDbStore) -> Result Result( .ok_or_else(|| Error::from_str(StatusCode::InternalServerError, "Unknown mime type")) } -struct SyncAsyncBufReader { +struct SyncAsyncReader { inner: R, } -impl From for SyncAsyncBufReader { +impl From for SyncAsyncReader { fn from(inner: R) -> Self { Self { inner } } } -impl std::io::Read for SyncAsyncBufReader { +impl std::io::Read for SyncAsyncReader { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { block_on(self.inner.read(buf)) } @@ -302,24 +313,69 @@ impl std::io::Read for SyncAsyncBufReader { //TODO: implement other methods } -impl std::io::BufRead for SyncAsyncBufReader { - fn fill_buf(&mut self) -> std::io::Result<&[u8]> { - unimplemented!() - } +#[derive(Default)] +struct HttpService { + client: H1Client, +} - fn consume(&mut self, _: usize) { - unimplemented!() +impl ServiceHandler for HttpService { + type Error = HttpServiceError; + + fn handle( + &self, + service_name: NamedNode, + query: Query, + ) -> std::result::Result { + let mut request = Request::new( + Method::Post, + Url::parse(service_name.as_str()).map_err(Error::from)?, + ); + request.append_header(headers::USER_AGENT, SERVER); + request.append_header(headers::CONTENT_TYPE, "application/sparql-query"); + request.append_header(headers::ACCEPT, "application/sparql-results+xml"); + request.set_body(query.to_string()); + + let response = block_on(async { self.client.send(request).await })?; + let syntax = if let Some(content_type) = response.content_type() { + QueryResultFormat::from_media_type(content_type.essence()).ok_or_else(|| { + format_err!( + "Unexpected federated query result type from {}: {}", + service_name, + content_type + ) + })? + } else { + QueryResultFormat::Xml + }; + Ok( + QueryResult::read(BufReader::new(SyncAsyncReader::from(response)), syntax) + .map_err(Error::from)?, + ) } +} + +#[derive(Debug)] +struct HttpServiceError { + inner: Error, +} - fn read_until(&mut self, byte: u8, buf: &mut Vec) -> std::io::Result { - block_on(self.inner.read_until(byte, buf)) +impl fmt::Display for HttpServiceError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.inner.fmt(f) } +} - fn read_line(&mut self, buf: &mut String) -> std::io::Result { - block_on(self.inner.read_line(buf)) +impl std::error::Error for HttpServiceError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + Some(self.inner.as_ref()) } } +impl From for HttpServiceError { + fn from(inner: Error) -> Self { + Self { inner } + } +} #[cfg(test)] mod tests { use crate::handle_request; @@ -419,6 +475,14 @@ mod tests { exec(request, StatusCode::UnsupportedMediaType) } + #[test] + fn post_federated_query() { + let mut request = Request::new(Method::Post, Url::parse("http://localhost/query").unwrap()); + request.insert_header("Content-Type", "application/sparql-query"); + request.set_body("SELECT * WHERE { SERVICE { ?p ?o } }"); + exec(request, StatusCode::Ok) + } + fn exec(request: Request, expected_status: StatusCode) { let mut path = temp_dir(); path.push("temp-oxigraph-server-test"); @@ -427,13 +491,11 @@ mod tests { path.push(&s.finish().to_string()); let store = RocksDbStore::open(&path).unwrap(); - assert_eq!( - match block_on(handle_request(request, store)) { - Ok(r) => r.status(), - Err(e) => e.status(), - }, - expected_status - ); + let (code, message) = match block_on(handle_request(request, store)) { + Ok(r) => (r.status(), "".to_string()), + Err(e) => (e.status(), e.to_string()), + }; + assert_eq!(code, expected_status, "Error message: {}", message); remove_dir_all(&path).unwrap() } } diff --git a/wikibase/Cargo.toml b/wikibase/Cargo.toml index 7d692eee..65c6950a 100644 --- a/wikibase/Cargo.toml +++ b/wikibase/Cargo.toml @@ -12,10 +12,10 @@ edition = "2018" [dependencies] argh = "0.1" -async-native-tls = "0.3" async-std = { version = "1", features = ["attributes"] } async-h1 = "2" chrono = "0.4" +http-client = { version = "4", features = ["h1_client"] } http-types = "2" oxigraph = { path = "../lib", features = ["rocksdb"] } serde_json = "1" diff --git a/wikibase/src/loader.rs b/wikibase/src/loader.rs index b5a29572..f17d7587 100644 --- a/wikibase/src/loader.rs +++ b/wikibase/src/loader.rs @@ -1,9 +1,10 @@ use crate::SERVER; -use async_std::net::TcpStream; use async_std::prelude::*; use async_std::task::block_on; use chrono::{DateTime, Datelike, Utc}; -use http_types::{headers, Error, Method, Request, Response, Result, StatusCode}; +use http_client::h1::H1Client; +use http_client::HttpClient; +use http_types::{headers, Method, Request, Result}; use oxigraph::io::GraphFormat; use oxigraph::model::NamedNode; use oxigraph::RocksDbStore; @@ -16,6 +17,7 @@ use url::{form_urlencoded, Url}; pub struct WikibaseLoader { store: RocksDbStore, + client: H1Client, api_url: Url, entity_data_url: Url, namespaces: Vec, @@ -35,6 +37,7 @@ impl WikibaseLoader { ) -> Result { Ok(Self { store, + client: H1Client::new(), api_url: Url::parse(api_url)?, entity_data_url: Url::parse(&(pages_base_url.to_owned() + "Special:EntityData"))?, namespaces: namespaces.to_vec(), @@ -216,49 +219,16 @@ impl WikibaseLoader { } let url = url.join(&("?".to_owned() + &query_serializer.finish()))?; let mut request = Request::new(Method::Get, url); - request.append_header(headers::SERVER, SERVER); + request.append_header(headers::USER_AGENT, SERVER); block_on(async { - let mut response = self.request(request).await?; + let mut response = self.client.send(request).await?; let mut buffer = Vec::new(); response.read_to_end(&mut buffer).await?; Ok(buffer) }) } - async fn request(&self, request: Request) -> Result { - let addr = request - .url() - .socket_addrs(|| None)? - .into_iter() - .next() - .ok_or_else(|| Error::from_str(StatusCode::BadRequest, "missing valid address"))?; - - Ok(match request.url().scheme() { - "http" => { - let stream = TcpStream::connect(addr).await?; - async_h1::connect(stream, request).await - } - "https" => { - let stream = async_native_tls::connect( - request - .url() - .host_str() - .ok_or_else(|| Error::from_str(StatusCode::BadRequest, "missing host"))?, - TcpStream::connect(addr).await?, - ) - .await?; - async_h1::connect(stream, request).await - } - _ => { - return Err(Error::from_str( - StatusCode::BadRequest, - "missing valid address", - )) - } - }?) - } - fn load_entity_data(&self, uri: &str, data: impl Read) -> Result<()> { let graph_name = NamedNode::new(uri)?.into(); self.store.transaction(|transaction| {