parent
f64001906c
commit
04827a71f8
File diff suppressed because it is too large
Load Diff
@ -1,22 +0,0 @@ |
||||
[package] |
||||
name = "oxigraph_wikibase" |
||||
version = "0.3.0-dev" |
||||
authors = ["Tpt <thomas@pellissier-tanon.fr>"] |
||||
license = "MIT OR Apache-2.0" |
||||
readme = "README.md" |
||||
repository = "https://github.com/oxigraph/oxigraph/tree/master/wikibase" |
||||
description = """ |
||||
Oxigraph SPARQL server for Wikibase |
||||
""" |
||||
edition = "2021" |
||||
|
||||
[dependencies] |
||||
clap = "2" |
||||
async-std = { version = "1", features = ["attributes"] } |
||||
async-h1 = "2" |
||||
chrono = "0.4" |
||||
http-client = { version = "6", features = ["h1_client"] } |
||||
http-types = "2" |
||||
oxigraph = { version = "0.3.0-dev", path ="../lib", features = ["http_client"] } |
||||
serde_json = "1" |
||||
url = "2" |
@ -1,11 +0,0 @@ |
||||
FROM rust:1-buster as builder |
||||
|
||||
COPY . /oxigraph |
||||
RUN cd /oxigraph/wikibase && cargo build --release |
||||
|
||||
FROM debian:buster-slim |
||||
|
||||
RUN apt-get update && apt-get install ca-certificates -y && rm -rf /var/lib/apt/lists/* |
||||
COPY --from=builder /oxigraph/target/release/oxigraph_wikibase /usr/local/bin/oxigraph_wikibase |
||||
|
||||
ENTRYPOINT [ "/usr/local/bin/oxigraph_wikibase" ] |
@ -1,81 +0,0 @@ |
||||
Oxigraph Wikibase |
||||
================= |
||||
|
||||
[![Latest Version](https://img.shields.io/crates/v/oxigraph_wikibase.svg)](https://crates.io/crates/oxigraph_wikibase) |
||||
[![Crates.io](https://img.shields.io/crates/d/oxigraph_wikibase)](https://crates.io/crates/oxigraph_wikibase) |
||||
[![Docker Image Version (latest semver)](https://img.shields.io/docker/v/oxigraph/oxigraph-wikibase?sort=semver)](https://hub.docker.com/repository/docker/oxigraph/oxigraph-wikibase) |
||||
[![Docker Image Size (latest semver)](https://img.shields.io/docker/image-size/oxigraph/oxigraph-wikibase)](https://hub.docker.com/repository/docker/oxigraph/oxigraph-wikibase) |
||||
[![Docker Pulls](https://img.shields.io/docker/pulls/oxigraph/oxigraph-wikibase)](https://hub.docker.com/repository/docker/oxigraph/oxigraph-wikibase) |
||||
[![actions status](https://github.com/oxigraph/oxigraph/workflows/build/badge.svg)](https://github.com/oxigraph/oxigraph/actions) |
||||
[![Gitter](https://badges.gitter.im/oxigraph/community.svg)](https://gitter.im/oxigraph/community?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge) |
||||
|
||||
Oxigraph Wikibase is a [SPARQL](https://www.w3.org/TR/sparql11-overview/) web server able to synchronize with a [Wikibase instance](https://wikiba.se/). |
||||
It is based on [Oxigraph](https://crates.io/crates/oxigraph). |
||||
|
||||
Oxigraph and Oxigraph Wikibase are in heavy development and not been optimized yet. |
||||
|
||||
## Installation |
||||
|
||||
You need to have [a recent stable version of Rust and Cargo installed](https://www.rust-lang.org/tools/install). |
||||
|
||||
To download, build and install the latest released version run `cargo install oxigraph_wikibase`. |
||||
There is no need to clone the git repository. |
||||
|
||||
To compile the server from source, clone this git repository, and execute `cargo build --release` in the `wikibase` directory to compile the full server after having downloaded its dependencies. |
||||
It will create a fat binary in `target/release/oxigraph_wikibase`. |
||||
|
||||
## Usage |
||||
|
||||
To start a server that is synchronized with [test.wikidata.org](https://test.wikidata.org) you should run: |
||||
```bash |
||||
./oxigraph_wikibase --mediawiki-api https://test.wikidata.org/w/api.php --mediawiki-base-url https://test.wikidata.org/wiki/ --namespaces 0,120 --file test.wikidata |
||||
``` |
||||
|
||||
It creates a SPARQL endpoint listening to `localhost:7878/query` that could be queried just like Blazegraph. |
||||
|
||||
The configuration parameters are: |
||||
* `mediawiki_api` URL of the MediaWiki API to use |
||||
* `mediawiki_base_url` Base URL of MediaWiki pages like `https://test.wikidata.org/wiki/` for test.wikidata.org or `http://localhost/w/index.php?title=` for "vanilla" installations. |
||||
* `namespaces` The ids of the Wikibase namespaces to synchronize with, separated by `,`. |
||||
* `file` Path of where Oxigraph should store its data. |
||||
|
||||
|
||||
You can then access it from your machine on port `7878`. No GUI is provided. |
||||
```sh |
||||
# Make a query |
||||
curl -X POST -H 'Accept: application/sparql-results+json' -H 'Content-Type: application/sparql-query' --data 'SELECT * WHERE { ?s ?p ?o } LIMIT 10' http://localhost:7878/query |
||||
``` |
||||
## Using a Docker image |
||||
|
||||
### Display the help menu |
||||
```sh |
||||
docker run --rm oxigraph/oxigraph-wikibase --help |
||||
``` |
||||
|
||||
### Run the Web server |
||||
Expose the server on port `7878` of the host machine, and save data on the local `./data` folder |
||||
```sh |
||||
docker run --init --rm -v $PWD/wikibase_data:/wikibase_data -p 7878:7878 oxigraph/oxigraph-wikibase -b 0.0.0.0:7878 -f /wikibase_data --mediawiki-api http://some.wikibase.instance/w/api.php --mediawiki-base-url http://some.wikibase.instance/wiki/ |
||||
``` |
||||
|
||||
Warning: the Wikibase instance needs to be accessible from within the container. |
||||
The clean way to do that could be to have both your wikibase and oxigraph_wikibase in the same [`docker-compose.yml`](https://docs.docker.com/compose/). |
||||
|
||||
You could easily build your own Docker image by running `docker build -t oxigraph-wikibase -f wikibase/Dockerfile .` from the root directory. |
||||
|
||||
|
||||
## License |
||||
|
||||
This project is licensed under either of |
||||
|
||||
* Apache License, Version 2.0, ([LICENSE-APACHE](../LICENSE-APACHE) or |
||||
http://www.apache.org/licenses/LICENSE-2.0) |
||||
* MIT license ([LICENSE-MIT](../LICENSE-MIT) or |
||||
http://opensource.org/licenses/MIT) |
||||
|
||||
at your option. |
||||
|
||||
|
||||
### Contribution |
||||
|
||||
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in Futures by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions. |
@ -1,249 +0,0 @@ |
||||
use crate::SERVER; |
||||
use async_std::prelude::*; |
||||
use async_std::task::block_on; |
||||
use chrono::{DateTime, Datelike, Utc}; |
||||
use http_client::h1::H1Client; |
||||
use http_client::HttpClient; |
||||
use http_types::{headers, Method, Request, Result}; |
||||
use oxigraph::io::GraphFormat; |
||||
use oxigraph::model::NamedNodeRef; |
||||
use oxigraph::store::Store; |
||||
use serde_json::Value; |
||||
use std::collections::{HashMap, HashSet}; |
||||
use std::io::{BufReader, Cursor, Read}; |
||||
use std::thread::sleep; |
||||
use std::time::Duration; |
||||
use url::{form_urlencoded, Url}; |
||||
|
||||
pub struct WikibaseLoader { |
||||
store: Store, |
||||
client: H1Client, |
||||
api_url: Url, |
||||
entity_data_url: Url, |
||||
namespaces: Vec<u32>, |
||||
slot: Option<String>, |
||||
frequency: Duration, |
||||
start: DateTime<Utc>, |
||||
} |
||||
|
||||
impl WikibaseLoader { |
||||
pub fn new( |
||||
store: Store, |
||||
api_url: &str, |
||||
pages_base_url: &str, |
||||
namespaces: &[u32], |
||||
slot: Option<&str>, |
||||
frequency: Duration, |
||||
) -> Result<Self> { |
||||
Ok(Self { |
||||
store, |
||||
client: H1Client::new(), |
||||
api_url: Url::parse(api_url)?, |
||||
entity_data_url: Url::parse(&(pages_base_url.to_owned() + "Special:EntityData"))?, |
||||
namespaces: namespaces.to_vec(), |
||||
slot: slot.map(|t| t.to_owned()), |
||||
start: Utc::now(), |
||||
frequency, |
||||
}) |
||||
} |
||||
|
||||
pub fn initial_loading(&mut self) -> Result<()> { |
||||
self.start = Utc::now(); |
||||
|
||||
if self.slot.is_some() { |
||||
println!("Skipping initial loading because a slot is required"); |
||||
// No good initial loading
|
||||
self.start = self.start.with_year(2018).unwrap(); |
||||
return Ok(()); |
||||
} |
||||
|
||||
println!("Initial loading "); |
||||
for namespace in &self.namespaces { |
||||
let mut parameters = HashMap::default(); |
||||
parameters.insert("action".to_owned(), "query".to_owned()); |
||||
parameters.insert("list".to_owned(), "allpages".to_owned()); |
||||
parameters.insert("apnamespace".to_owned(), namespace.to_string()); |
||||
parameters.insert("aplimit".to_owned(), "50".to_owned()); |
||||
|
||||
self.api_get_with_continue(parameters, |results| { |
||||
println!("*"); |
||||
for page in results |
||||
.as_object() |
||||
.unwrap() |
||||
.get("query") |
||||
.unwrap() |
||||
.get("allpages") |
||||
.unwrap() |
||||
.as_array() |
||||
.unwrap() |
||||
{ |
||||
let desc = page.as_object().unwrap(); |
||||
let title = desc.get("title").unwrap().as_str().unwrap(); |
||||
|
||||
let id = title.split(':').last().unwrap_or(title); |
||||
|
||||
match self.get_entity_data(id) { |
||||
Ok(data) => { |
||||
self.load_entity_data( |
||||
&(self.entity_data_url.to_string() + "/" + id), |
||||
Cursor::new(data), |
||||
)?; |
||||
} |
||||
Err(e) => eprintln!("Error while retrieving data for entity {}: {}", id, e), |
||||
} |
||||
} |
||||
Ok(()) |
||||
})?; |
||||
} |
||||
Ok(()) |
||||
} |
||||
|
||||
pub fn update_loop(&mut self) { |
||||
loop { |
||||
let new_start = Utc::now(); |
||||
if let Err(e) = self.refresh_step(self.start) { |
||||
eprintln!("WikibaseLoader error: {}", e); |
||||
} |
||||
self.start = new_start; |
||||
let elapsed = (Utc::now() - self.start).to_std().unwrap(); |
||||
if elapsed < self.frequency { |
||||
sleep(self.frequency - elapsed); |
||||
} |
||||
} |
||||
} |
||||
|
||||
fn refresh_step(&self, start: DateTime<Utc>) -> Result<()> { |
||||
let mut seen: HashSet<String> = HashSet::default(); |
||||
|
||||
let mut parameters = HashMap::default(); |
||||
parameters.insert("action".to_owned(), "query".to_owned()); |
||||
parameters.insert("list".to_owned(), "recentchanges".to_owned()); |
||||
if let Some(slot) = &self.slot { |
||||
parameters.insert("rcslot".to_owned(), slot.to_owned()); |
||||
} else { |
||||
parameters.insert( |
||||
"rcnamespace".to_owned(), |
||||
self.namespaces |
||||
.iter() |
||||
.map(|ns| ns.to_string()) |
||||
.collect::<Vec<_>>() |
||||
.join("|"), |
||||
); |
||||
} |
||||
parameters.insert("rcend".to_owned(), start.to_rfc2822()); |
||||
parameters.insert("rcprop".to_owned(), "title|ids".to_owned()); |
||||
parameters.insert("rclimit".to_owned(), "50".to_owned()); |
||||
|
||||
self.api_get_with_continue(parameters, |results| { |
||||
for change in results |
||||
.as_object() |
||||
.unwrap() |
||||
.get("query") |
||||
.unwrap() |
||||
.get("recentchanges") |
||||
.unwrap() |
||||
.as_array() |
||||
.unwrap() |
||||
{ |
||||
let desc = change.as_object().unwrap(); |
||||
let id = if desc.get("ns").unwrap().as_u64().unwrap() == 6 { |
||||
// Hack for media info
|
||||
format!("M{}", desc.get("pageid").unwrap().as_u64().unwrap()) |
||||
} else { |
||||
let title = desc.get("title").unwrap().as_str().unwrap(); |
||||
title.split(':').last().unwrap_or(title).to_owned() |
||||
}; |
||||
if seen.contains(&id) { |
||||
continue; |
||||
} |
||||
seen.insert(id.clone()); |
||||
|
||||
match self.get_entity_data(&id) { |
||||
Ok(data) => { |
||||
self.load_entity_data( |
||||
&format!("{}/{}", self.entity_data_url, id), |
||||
Cursor::new(data), |
||||
)?; |
||||
} |
||||
Err(e) => eprintln!("Error while retrieving data for entity {}: {}", id, e), |
||||
} |
||||
} |
||||
Ok(()) |
||||
}) |
||||
} |
||||
|
||||
fn api_get_with_continue( |
||||
&self, |
||||
mut parameters: HashMap<String, String>, |
||||
mut on_results: impl FnMut(&Value) -> Result<()>, |
||||
) -> Result<()> { |
||||
loop { |
||||
let results = self.api_get(&mut parameters)?; |
||||
on_results(&results)?; |
||||
|
||||
if let Some(cont) = results.get("continue") { |
||||
for (k, v) in cont.as_object().unwrap().iter() { |
||||
parameters.insert(k.to_owned(), v.as_str().unwrap().to_owned()); |
||||
} |
||||
} else { |
||||
return Ok(()); |
||||
} |
||||
} |
||||
} |
||||
|
||||
fn api_get(&self, parameters: &mut HashMap<String, String>) -> Result<Value> { |
||||
parameters.insert("format".to_owned(), "json".to_owned()); |
||||
|
||||
Ok(serde_json::from_slice( |
||||
&self.get_request(&self.api_url, parameters)?, |
||||
)?) |
||||
} |
||||
|
||||
fn get_entity_data(&self, id: &str) -> Result<Vec<u8>> { |
||||
self.get_request( |
||||
&self.entity_data_url, |
||||
[("id", id), ("format", "nt"), ("flavor", "dump")] |
||||
.iter() |
||||
.cloned(), |
||||
) |
||||
} |
||||
|
||||
fn get_request<K: AsRef<str>, V: AsRef<str>>( |
||||
&self, |
||||
url: &Url, |
||||
params: impl IntoIterator<Item = (K, V)>, |
||||
) -> Result<Vec<u8>> { |
||||
let mut query_serializer = form_urlencoded::Serializer::new(String::new()); |
||||
for (k, v) in params { |
||||
query_serializer.append_pair(k.as_ref(), v.as_ref()); |
||||
} |
||||
let url = url.join(&("?".to_owned() + &query_serializer.finish()))?; |
||||
let mut request = Request::new(Method::Get, url); |
||||
request.append_header(headers::USER_AGENT, SERVER); |
||||
|
||||
block_on(async { |
||||
let mut response = self.client.send(request).await?; |
||||
let mut buffer = Vec::new(); |
||||
response.read_to_end(&mut buffer).await?; |
||||
Ok(buffer) |
||||
}) |
||||
} |
||||
|
||||
fn load_entity_data(&self, uri: &str, data: impl Read) -> Result<()> { |
||||
let graph_name = NamedNodeRef::new(uri)?; |
||||
//TODO: proper transaction
|
||||
for q in self |
||||
.store |
||||
.quads_for_pattern(None, None, None, Some(graph_name.into())) |
||||
{ |
||||
self.store.remove(&q?)?; |
||||
} |
||||
self.store.load_graph( |
||||
BufReader::new(data), |
||||
GraphFormat::NTriples, |
||||
graph_name, |
||||
None, |
||||
)?; |
||||
Ok(()) |
||||
} |
||||
} |
@ -1,370 +0,0 @@ |
||||
#![deny(
|
||||
future_incompatible, |
||||
nonstandard_style, |
||||
rust_2018_idioms, |
||||
missing_copy_implementations, |
||||
trivial_casts, |
||||
trivial_numeric_casts, |
||||
unsafe_code, |
||||
unused_qualifications |
||||
)] |
||||
|
||||
use crate::loader::WikibaseLoader; |
||||
use async_std::future::Future; |
||||
use async_std::net::{TcpListener, TcpStream}; |
||||
use async_std::prelude::*; |
||||
use async_std::task::spawn; |
||||
use clap::{crate_version, App, Arg}; |
||||
use http_types::content::ContentType; |
||||
use http_types::{ |
||||
bail_status, format_err_status, headers, Error, Method, Mime, Request, Response, Result, |
||||
StatusCode, |
||||
}; |
||||
use oxigraph::io::GraphFormat; |
||||
use oxigraph::model::{GraphName, NamedNode, NamedOrBlankNode}; |
||||
use oxigraph::sparql::{Query, QueryResults, QueryResultsFormat}; |
||||
use oxigraph::store::Store; |
||||
use std::str::FromStr; |
||||
use std::time::Duration; |
||||
use url::form_urlencoded; |
||||
|
||||
mod loader; |
||||
|
||||
const MAX_SPARQL_BODY_SIZE: u64 = 1_048_576; |
||||
const SERVER: &str = concat!("Oxigraph/", env!("CARGO_PKG_VERSION")); |
||||
|
||||
#[async_std::main] |
||||
pub async fn main() -> Result<()> { |
||||
let matches = App::new("Oxigraph SPARQL server for Wikibase") |
||||
.version(crate_version!()) |
||||
.arg( |
||||
Arg::with_name("bind") |
||||
.short("b") |
||||
.long("bind") |
||||
.help("Sets a custom config file") |
||||
.default_value("localhost:7878") |
||||
.takes_value(true), |
||||
) |
||||
.arg( |
||||
Arg::with_name("file") |
||||
.short("f") |
||||
.long("file") |
||||
.help("Directory in which persist the data") |
||||
.takes_value(true), |
||||
) |
||||
.arg( |
||||
Arg::with_name("mediawiki_api") |
||||
.long("mediawiki_api") |
||||
.help("Base URL of the MediaWiki API like https://www.wikidata.org/w/api.php") |
||||
.takes_value(true) |
||||
.required(true), |
||||
) |
||||
.arg( |
||||
Arg::with_name("mediawiki_base_url") |
||||
.long("mediawiki_base_url") |
||||
.help("Base URL of MediaWiki like https://www.wikidata.org/wiki/") |
||||
.takes_value(true) |
||||
.required(true), |
||||
) |
||||
.arg( |
||||
Arg::with_name("namespaces") |
||||
.long("namespaces") |
||||
.help("Namespaces ids to load like '0,120'") |
||||
.default_value("") |
||||
.takes_value(true), |
||||
) |
||||
.arg( |
||||
Arg::with_name("slot") |
||||
.long("slot") |
||||
.help("Slot to load like 'mediainfo'. Could not be use with namespaces") |
||||
.takes_value(true), |
||||
) |
||||
.get_matches(); |
||||
let bind = matches.value_of("bind").unwrap(); |
||||
let file = matches.value_of("file"); |
||||
let mediawiki_api = matches.value_of("mediawiki_api").unwrap(); |
||||
let mediawiki_base_url = matches.value_of("mediawiki_base_url").unwrap(); |
||||
let namespaces = matches |
||||
.value_of("namespaces") |
||||
.unwrap() |
||||
.split(',') |
||||
.flat_map(|t| { |
||||
let t = t.trim(); |
||||
if t.is_empty() { |
||||
None |
||||
} else { |
||||
Some(u32::from_str(t).unwrap()) |
||||
} |
||||
}) |
||||
.collect::<Vec<_>>(); |
||||
let slot = matches.value_of("slot"); |
||||
|
||||
let store = if let Some(file) = file { |
||||
Store::open(file) |
||||
} else { |
||||
Store::new() |
||||
}?; |
||||
let repo = store.clone(); |
||||
let mut loader = WikibaseLoader::new( |
||||
repo, |
||||
mediawiki_api, |
||||
mediawiki_base_url, |
||||
&namespaces, |
||||
slot, |
||||
Duration::new(10, 0), |
||||
) |
||||
.unwrap(); |
||||
spawn(async move { |
||||
loader.initial_loading().unwrap(); |
||||
loader.update_loop(); |
||||
}); |
||||
|
||||
println!("Listening for requests at http://{}", &bind); |
||||
|
||||
http_server(bind, move |request| handle_request(request, store.clone())).await |
||||
} |
||||
|
||||
async fn handle_request(request: Request, store: Store) -> Result<Response> { |
||||
Ok(match (request.url().path(), request.method()) { |
||||
("/query", Method::Get) => { |
||||
configure_and_evaluate_sparql_query(store, url_query(&request), None, request)? |
||||
} |
||||
("/query", Method::Post) => { |
||||
if let Some(content_type) = request.content_type() { |
||||
if content_type.essence() == "application/sparql-query" { |
||||
let mut buffer = String::new(); |
||||
let mut request = request; |
||||
request |
||||
.take_body() |
||||
.take(MAX_SPARQL_BODY_SIZE) |
||||
.read_to_string(&mut buffer) |
||||
.await?; |
||||
configure_and_evaluate_sparql_query( |
||||
store, |
||||
url_query(&request), |
||||
Some(buffer), |
||||
request, |
||||
)? |
||||
} else if content_type.essence() == "application/x-www-form-urlencoded" { |
||||
let mut buffer = Vec::new(); |
||||
let mut request = request; |
||||
request |
||||
.take_body() |
||||
.take(MAX_SPARQL_BODY_SIZE) |
||||
.read_to_end(&mut buffer) |
||||
.await?; |
||||
configure_and_evaluate_sparql_query(store, buffer, None, request)? |
||||
} else { |
||||
bail_status!(415, "Not supported Content-Type given: {}", content_type); |
||||
} |
||||
} else { |
||||
bail_status!(400, "No Content-Type given"); |
||||
} |
||||
} |
||||
_ => { |
||||
bail_status!( |
||||
404, |
||||
"{} {} is not supported by this server", |
||||
request.method(), |
||||
request.url().path() |
||||
); |
||||
} |
||||
}) |
||||
} |
||||
|
||||
fn url_query(request: &Request) -> Vec<u8> { |
||||
request.url().query().unwrap_or("").as_bytes().to_vec() |
||||
} |
||||
|
||||
fn configure_and_evaluate_sparql_query( |
||||
store: Store, |
||||
encoded: Vec<u8>, |
||||
mut query: Option<String>, |
||||
request: Request, |
||||
) -> Result<Response> { |
||||
let mut default_graph_uris = Vec::new(); |
||||
let mut named_graph_uris = Vec::new(); |
||||
for (k, v) in form_urlencoded::parse(&encoded) { |
||||
match k.as_ref() { |
||||
"query" => { |
||||
if query.is_some() { |
||||
bail_status!(400, "Multiple query parameters provided") |
||||
} |
||||
query = Some(v.into_owned()) |
||||
} |
||||
"default-graph-uri" => default_graph_uris.push(v.into_owned()), |
||||
"named-graph-uri" => named_graph_uris.push(v.into_owned()), |
||||
_ => (), |
||||
} |
||||
} |
||||
if let Some(query) = query { |
||||
evaluate_sparql_query(store, query, default_graph_uris, named_graph_uris, request) |
||||
} else { |
||||
bail_status!(400, "You should set the 'query' parameter") |
||||
} |
||||
} |
||||
|
||||
fn evaluate_sparql_query( |
||||
store: Store, |
||||
query: String, |
||||
default_graph_uris: Vec<String>, |
||||
named_graph_uris: Vec<String>, |
||||
request: Request, |
||||
) -> Result<Response> { |
||||
let mut query = Query::parse(&query, None).map_err(bad_request)?; |
||||
let default_graph_uris = default_graph_uris |
||||
.into_iter() |
||||
.map(|e| Ok(NamedNode::new(e)?.into())) |
||||
.collect::<Result<Vec<GraphName>>>() |
||||
.map_err(bad_request)?; |
||||
let named_graph_uris = named_graph_uris |
||||
.into_iter() |
||||
.map(|e| Ok(NamedNode::new(e)?.into())) |
||||
.collect::<Result<Vec<NamedOrBlankNode>>>() |
||||
.map_err(bad_request)?; |
||||
|
||||
if !default_graph_uris.is_empty() || !named_graph_uris.is_empty() { |
||||
query.dataset_mut().set_default_graph(default_graph_uris); |
||||
query |
||||
.dataset_mut() |
||||
.set_available_named_graphs(named_graph_uris); |
||||
} |
||||
|
||||
let results = store.query(query)?; |
||||
//TODO: stream
|
||||
if let QueryResults::Graph(_) = results { |
||||
let format = content_negotiation( |
||||
request, |
||||
&[ |
||||
GraphFormat::NTriples.media_type(), |
||||
GraphFormat::Turtle.media_type(), |
||||
GraphFormat::RdfXml.media_type(), |
||||
], |
||||
GraphFormat::from_media_type, |
||||
)?; |
||||
let mut body = Vec::default(); |
||||
results.write_graph(&mut body, format)?; |
||||
let mut response = Response::from(body); |
||||
ContentType::new(format.media_type()).apply(&mut response); |
||||
Ok(response) |
||||
} else { |
||||
let format = content_negotiation( |
||||
request, |
||||
&[ |
||||
QueryResultsFormat::Xml.media_type(), |
||||
QueryResultsFormat::Json.media_type(), |
||||
QueryResultsFormat::Csv.media_type(), |
||||
QueryResultsFormat::Tsv.media_type(), |
||||
], |
||||
QueryResultsFormat::from_media_type, |
||||
)?; |
||||
let mut body = Vec::default(); |
||||
results.write(&mut body, format)?; |
||||
let mut response = Response::from(body); |
||||
ContentType::new(format.media_type()).apply(&mut response); |
||||
Ok(response) |
||||
} |
||||
} |
||||
|
||||
async fn http_server< |
||||
F: Clone + Send + Sync + 'static + Fn(Request) -> Fut, |
||||
Fut: Send + Future<Output = Result<Response>>, |
||||
>( |
||||
host: &str, |
||||
handle: F, |
||||
) -> Result<()> { |
||||
async fn accept<F: Fn(Request) -> Fut, Fut: Future<Output = Result<Response>>>( |
||||
stream: TcpStream, |
||||
handle: F, |
||||
) -> Result<()> { |
||||
async_h1::accept(stream, |request| async { |
||||
let mut response = match handle(request).await { |
||||
Ok(result) => result, |
||||
Err(error) => { |
||||
if error.status().is_server_error() { |
||||
eprintln!("{}", error); |
||||
} |
||||
let mut response = Response::new(error.status()); |
||||
response.set_body(error.to_string()); |
||||
response |
||||
} |
||||
}; |
||||
response.append_header(headers::SERVER, SERVER); |
||||
Ok(response) |
||||
}) |
||||
.await |
||||
} |
||||
|
||||
let listener = TcpListener::bind(&host).await?; |
||||
let mut incoming = listener.incoming(); |
||||
while let Some(stream) = incoming.next().await { |
||||
let stream = stream?; |
||||
let handle = handle.clone(); |
||||
spawn(async { |
||||
if let Err(err) = accept(stream, handle).await { |
||||
eprintln!("{}", err); |
||||
}; |
||||
}); |
||||
} |
||||
Ok(()) |
||||
} |
||||
|
||||
fn bad_request(e: impl Into<Error>) -> Error { |
||||
let mut e = e.into(); |
||||
e.set_status(StatusCode::BadRequest); |
||||
e |
||||
} |
||||
|
||||
fn content_negotiation<F>( |
||||
request: Request, |
||||
supported: &[&str], |
||||
parse: impl Fn(&str) -> Option<F>, |
||||
) -> Result<F> { |
||||
let header = request |
||||
.header(headers::ACCEPT) |
||||
.map(|h| h.last().as_str().trim()) |
||||
.unwrap_or(""); |
||||
let supported_mime: Vec<Mime> = supported |
||||
.iter() |
||||
.map(|h| Mime::from_str(h).unwrap()) |
||||
.collect(); |
||||
|
||||
if header.is_empty() { |
||||
return parse(supported.first().unwrap()) |
||||
.ok_or_else(|| Error::from_str(StatusCode::InternalServerError, "Unknown mime type")); |
||||
} |
||||
let mut result = None; |
||||
let mut result_score = 0f32; |
||||
|
||||
for possible in header.split(',') { |
||||
let possible = Mime::from_str(possible.trim())?; |
||||
let score = if let Some(q) = possible.param("q") { |
||||
f32::from_str(&q.to_string())? |
||||
} else { |
||||
1. |
||||
}; |
||||
if score <= result_score { |
||||
continue; |
||||
} |
||||
for candidate in &supported_mime { |
||||
if (possible.basetype() == candidate.basetype() || possible.basetype() == "*") |
||||
&& (possible.subtype() == candidate.subtype() || possible.subtype() == "*") |
||||
{ |
||||
result = Some(candidate); |
||||
result_score = score; |
||||
break; |
||||
} |
||||
} |
||||
} |
||||
|
||||
let result = result.ok_or_else(|| { |
||||
format_err_status!( |
||||
406, |
||||
"The available Content-Types are {}", |
||||
supported.join(", ") |
||||
) |
||||
})?; |
||||
|
||||
parse(result.essence()) |
||||
.ok_or_else(|| Error::from_str(StatusCode::InternalServerError, "Unknown mime type")) |
||||
} |
Loading…
Reference in new issue