Adds federated queries to the standalone server

pull/46/head
Tpt 5 years ago
parent 0a2566b100
commit 240e09c3ac
  1. 1
      server/Cargo.toml
  2. 1
      server/Dockerfile
  3. 116
      server/src/main.rs
  4. 2
      wikibase/Cargo.toml
  5. 44
      wikibase/src/loader.rs

@ -14,6 +14,7 @@ edition = "2018"
argh = "0.1" argh = "0.1"
async-std = { version = "1", features = ["attributes"] } async-std = { version = "1", features = ["attributes"] }
async-h1 = "2" async-h1 = "2"
http-client = { version = "4", features = ["h1_client"] }
http-types = "2" http-types = "2"
oxigraph = { path = "../lib", features = ["rocksdb"] } oxigraph = { path = "../lib", features = ["rocksdb"] }
url = "2" url = "2"

@ -6,6 +6,7 @@ RUN cd /oxigraph/server && cargo build --release
FROM debian:buster-slim FROM debian:buster-slim
RUN apt-get update && apt-get install ca-certificates -y && rm -rf /var/lib/apt/lists/*
COPY --from=builder /oxigraph/target/release/oxigraph_server /usr/local/bin/oxigraph_server COPY --from=builder /oxigraph/target/release/oxigraph_server /usr/local/bin/oxigraph_server
ENTRYPOINT [ "/usr/local/bin/oxigraph_server" ] ENTRYPOINT [ "/usr/local/bin/oxigraph_server" ]

@ -11,15 +11,21 @@
use argh::FromArgs; use argh::FromArgs;
use async_std::future::Future; use async_std::future::Future;
use async_std::io::{BufRead, Read}; use async_std::io::Read;
use async_std::net::{TcpListener, TcpStream}; use async_std::net::{TcpListener, TcpStream};
use async_std::prelude::*; use async_std::prelude::*;
use async_std::task::{block_on, spawn, spawn_blocking}; use async_std::task::{block_on, spawn, spawn_blocking};
use http_types::{headers, Body, Error, Method, Mime, Request, Response, Result, StatusCode}; use http_client::h1::H1Client;
use http_client::HttpClient;
use http_types::{
format_err, headers, Body, Error, Method, Mime, Request, Response, Result, StatusCode, Url,
};
use oxigraph::io::{DatasetFormat, GraphFormat}; use oxigraph::io::{DatasetFormat, GraphFormat};
use oxigraph::model::GraphName; use oxigraph::model::{GraphName, NamedNode};
use oxigraph::sparql::{Query, QueryOptions, QueryResult, QueryResultFormat}; use oxigraph::sparql::{Query, QueryOptions, QueryResult, QueryResultFormat, ServiceHandler};
use oxigraph::RocksDbStore; use oxigraph::RocksDbStore;
use std::fmt;
use std::io::BufReader;
use std::str::FromStr; use std::str::FromStr;
use url::form_urlencoded; use url::form_urlencoded;
@ -64,7 +70,7 @@ async fn handle_request(request: Request, store: RocksDbStore) -> Result<Respons
match if let Some(format) = GraphFormat::from_media_type(content_type.essence()) { match if let Some(format) = GraphFormat::from_media_type(content_type.essence()) {
spawn_blocking(move || { spawn_blocking(move || {
store.load_graph( store.load_graph(
SyncAsyncBufReader::from(request), BufReader::new(SyncAsyncReader::from(request)),
format, format,
&GraphName::DefaultGraph, &GraphName::DefaultGraph,
None, None,
@ -73,7 +79,11 @@ async fn handle_request(request: Request, store: RocksDbStore) -> Result<Respons
} else if let Some(format) = DatasetFormat::from_media_type(content_type.essence()) } else if let Some(format) = DatasetFormat::from_media_type(content_type.essence())
{ {
spawn_blocking(move || { spawn_blocking(move || {
store.load_dataset(SyncAsyncBufReader::from(request), format, None) store.load_dataset(
BufReader::new(SyncAsyncReader::from(request)),
format,
None,
)
}) })
} else { } else {
return Ok(simple_response( return Ok(simple_response(
@ -170,7 +180,8 @@ async fn evaluate_sparql_query(
e.set_status(StatusCode::BadRequest); e.set_status(StatusCode::BadRequest);
e e
})?; })?;
let results = store.query(query, QueryOptions::default())?; let options = QueryOptions::default().with_service_handler(HttpService::default());
let results = store.query(query, options)?;
//TODO: stream //TODO: stream
if let QueryResult::Graph(_) = results { if let QueryResult::Graph(_) = results {
let format = content_negotiation( let format = content_negotiation(
@ -284,17 +295,17 @@ fn content_negotiation<F>(
.ok_or_else(|| Error::from_str(StatusCode::InternalServerError, "Unknown mime type")) .ok_or_else(|| Error::from_str(StatusCode::InternalServerError, "Unknown mime type"))
} }
struct SyncAsyncBufReader<R: Unpin> { struct SyncAsyncReader<R: Unpin> {
inner: R, inner: R,
} }
impl<R: Unpin> From<R> for SyncAsyncBufReader<R> { impl<R: Unpin> From<R> for SyncAsyncReader<R> {
fn from(inner: R) -> Self { fn from(inner: R) -> Self {
Self { inner } Self { inner }
} }
} }
impl<R: Read + Unpin> std::io::Read for SyncAsyncBufReader<R> { impl<R: Read + Unpin> std::io::Read for SyncAsyncReader<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
block_on(self.inner.read(buf)) block_on(self.inner.read(buf))
} }
@ -302,24 +313,69 @@ impl<R: Read + Unpin> std::io::Read for SyncAsyncBufReader<R> {
//TODO: implement other methods //TODO: implement other methods
} }
impl<R: BufRead + Unpin> std::io::BufRead for SyncAsyncBufReader<R> { #[derive(Default)]
fn fill_buf(&mut self) -> std::io::Result<&[u8]> { struct HttpService {
unimplemented!() client: H1Client,
} }
fn consume(&mut self, _: usize) { impl ServiceHandler for HttpService {
unimplemented!() type Error = HttpServiceError;
fn handle(
&self,
service_name: NamedNode,
query: Query,
) -> std::result::Result<QueryResult, HttpServiceError> {
let mut request = Request::new(
Method::Post,
Url::parse(service_name.as_str()).map_err(Error::from)?,
);
request.append_header(headers::USER_AGENT, SERVER);
request.append_header(headers::CONTENT_TYPE, "application/sparql-query");
request.append_header(headers::ACCEPT, "application/sparql-results+xml");
request.set_body(query.to_string());
let response = block_on(async { self.client.send(request).await })?;
let syntax = if let Some(content_type) = response.content_type() {
QueryResultFormat::from_media_type(content_type.essence()).ok_or_else(|| {
format_err!(
"Unexpected federated query result type from {}: {}",
service_name,
content_type
)
})?
} else {
QueryResultFormat::Xml
};
Ok(
QueryResult::read(BufReader::new(SyncAsyncReader::from(response)), syntax)
.map_err(Error::from)?,
)
} }
}
#[derive(Debug)]
struct HttpServiceError {
inner: Error,
}
fn read_until(&mut self, byte: u8, buf: &mut Vec<u8>) -> std::io::Result<usize> { impl fmt::Display for HttpServiceError {
block_on(self.inner.read_until(byte, buf)) fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.inner.fmt(f)
} }
}
fn read_line(&mut self, buf: &mut String) -> std::io::Result<usize> { impl std::error::Error for HttpServiceError {
block_on(self.inner.read_line(buf)) fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(self.inner.as_ref())
} }
} }
impl From<Error> for HttpServiceError {
fn from(inner: Error) -> Self {
Self { inner }
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::handle_request; use crate::handle_request;
@ -419,6 +475,14 @@ mod tests {
exec(request, StatusCode::UnsupportedMediaType) exec(request, StatusCode::UnsupportedMediaType)
} }
#[test]
fn post_federated_query() {
let mut request = Request::new(Method::Post, Url::parse("http://localhost/query").unwrap());
request.insert_header("Content-Type", "application/sparql-query");
request.set_body("SELECT * WHERE { SERVICE <https://query.wikidata.org/sparql> { <https://en.wikipedia.org/wiki/Paris> ?p ?o } }");
exec(request, StatusCode::Ok)
}
fn exec(request: Request, expected_status: StatusCode) { fn exec(request: Request, expected_status: StatusCode) {
let mut path = temp_dir(); let mut path = temp_dir();
path.push("temp-oxigraph-server-test"); path.push("temp-oxigraph-server-test");
@ -427,13 +491,11 @@ mod tests {
path.push(&s.finish().to_string()); path.push(&s.finish().to_string());
let store = RocksDbStore::open(&path).unwrap(); let store = RocksDbStore::open(&path).unwrap();
assert_eq!( let (code, message) = match block_on(handle_request(request, store)) {
match block_on(handle_request(request, store)) { Ok(r) => (r.status(), "".to_string()),
Ok(r) => r.status(), Err(e) => (e.status(), e.to_string()),
Err(e) => e.status(), };
}, assert_eq!(code, expected_status, "Error message: {}", message);
expected_status
);
remove_dir_all(&path).unwrap() remove_dir_all(&path).unwrap()
} }
} }

@ -12,10 +12,10 @@ edition = "2018"
[dependencies] [dependencies]
argh = "0.1" argh = "0.1"
async-native-tls = "0.3"
async-std = { version = "1", features = ["attributes"] } async-std = { version = "1", features = ["attributes"] }
async-h1 = "2" async-h1 = "2"
chrono = "0.4" chrono = "0.4"
http-client = { version = "4", features = ["h1_client"] }
http-types = "2" http-types = "2"
oxigraph = { path = "../lib", features = ["rocksdb"] } oxigraph = { path = "../lib", features = ["rocksdb"] }
serde_json = "1" serde_json = "1"

@ -1,9 +1,10 @@
use crate::SERVER; use crate::SERVER;
use async_std::net::TcpStream;
use async_std::prelude::*; use async_std::prelude::*;
use async_std::task::block_on; use async_std::task::block_on;
use chrono::{DateTime, Datelike, Utc}; use chrono::{DateTime, Datelike, Utc};
use http_types::{headers, Error, Method, Request, Response, Result, StatusCode}; use http_client::h1::H1Client;
use http_client::HttpClient;
use http_types::{headers, Method, Request, Result};
use oxigraph::io::GraphFormat; use oxigraph::io::GraphFormat;
use oxigraph::model::NamedNode; use oxigraph::model::NamedNode;
use oxigraph::RocksDbStore; use oxigraph::RocksDbStore;
@ -16,6 +17,7 @@ use url::{form_urlencoded, Url};
pub struct WikibaseLoader { pub struct WikibaseLoader {
store: RocksDbStore, store: RocksDbStore,
client: H1Client,
api_url: Url, api_url: Url,
entity_data_url: Url, entity_data_url: Url,
namespaces: Vec<u32>, namespaces: Vec<u32>,
@ -35,6 +37,7 @@ impl WikibaseLoader {
) -> Result<Self> { ) -> Result<Self> {
Ok(Self { Ok(Self {
store, store,
client: H1Client::new(),
api_url: Url::parse(api_url)?, api_url: Url::parse(api_url)?,
entity_data_url: Url::parse(&(pages_base_url.to_owned() + "Special:EntityData"))?, entity_data_url: Url::parse(&(pages_base_url.to_owned() + "Special:EntityData"))?,
namespaces: namespaces.to_vec(), namespaces: namespaces.to_vec(),
@ -216,49 +219,16 @@ impl WikibaseLoader {
} }
let url = url.join(&("?".to_owned() + &query_serializer.finish()))?; let url = url.join(&("?".to_owned() + &query_serializer.finish()))?;
let mut request = Request::new(Method::Get, url); let mut request = Request::new(Method::Get, url);
request.append_header(headers::SERVER, SERVER); request.append_header(headers::USER_AGENT, SERVER);
block_on(async { block_on(async {
let mut response = self.request(request).await?; let mut response = self.client.send(request).await?;
let mut buffer = Vec::new(); let mut buffer = Vec::new();
response.read_to_end(&mut buffer).await?; response.read_to_end(&mut buffer).await?;
Ok(buffer) Ok(buffer)
}) })
} }
async fn request(&self, request: Request) -> Result<Response> {
let addr = request
.url()
.socket_addrs(|| None)?
.into_iter()
.next()
.ok_or_else(|| Error::from_str(StatusCode::BadRequest, "missing valid address"))?;
Ok(match request.url().scheme() {
"http" => {
let stream = TcpStream::connect(addr).await?;
async_h1::connect(stream, request).await
}
"https" => {
let stream = async_native_tls::connect(
request
.url()
.host_str()
.ok_or_else(|| Error::from_str(StatusCode::BadRequest, "missing host"))?,
TcpStream::connect(addr).await?,
)
.await?;
async_h1::connect(stream, request).await
}
_ => {
return Err(Error::from_str(
StatusCode::BadRequest,
"missing valid address",
))
}
}?)
}
fn load_entity_data(&self, uri: &str, data: impl Read) -> Result<()> { fn load_entity_data(&self, uri: &str, data: impl Read) -> Result<()> {
let graph_name = NamedNode::new(uri)?.into(); let graph_name = NamedNode::new(uri)?.into();
self.store.transaction(|transaction| { self.store.transaction(|transaction| {

Loading…
Cancel
Save