Uses async-h1 in oxigraph_wikibase

pull/35/head
Tpt 4 years ago
parent 0440b8ce80
commit 6930a84521
  1. 4
      server/Cargo.toml
  2. 11
      wikibase/Cargo.toml
  3. 85
      wikibase/src/loader.rs
  4. 318
      wikibase/src/main.rs

@ -11,9 +11,9 @@ SPARQL server based on Oxigraph
edition = "2018" edition = "2018"
[dependencies] [dependencies]
oxigraph = { path = "../lib", features = ["rocksdb"] }
argh = "0.1" argh = "0.1"
async-std = { version = "1", features = ["attributes"] } async-std = { version = "1", features = ["attributes"] }
async-h1 = "1" async-h1 = "1"
http-types = "1" http-types = "1"
url = "2" oxigraph = { path = "../lib", features = ["rocksdb"] }
url = "2"

@ -11,9 +11,12 @@ SPARQL server based on Oxigraph for Wikibase instances
edition = "2018" edition = "2018"
[dependencies] [dependencies]
oxigraph = {path = "../lib", features = ["rocksdb"] }
argh = "0.1" argh = "0.1"
rouille = "3" async-std = { version = "1", features = ["attributes"] }
reqwest = "0.9" async-h1 = "1"
chrono = "0.4"
http-client = { version = "2.0", features = ["h1_client"] }
http-types = "1"
oxigraph = { path = "../lib", features = ["rocksdb"] }
serde_json = "1" serde_json = "1"
chrono = "0.4" url = "2"

@ -1,20 +1,24 @@
use crate::SERVER; use crate::SERVER;
use async_std::prelude::*;
use async_std::task::block_on;
use chrono::{DateTime, Datelike, Utc}; use chrono::{DateTime, Datelike, Utc};
use http_client::h1::H1Client;
use http_client::HttpClient;
use http_types::{Method, Request, Result};
use oxigraph::model::NamedNode; use oxigraph::model::NamedNode;
use oxigraph::*; use oxigraph::{GraphSyntax, Repository, RepositoryConnection, RepositoryTransaction};
use reqwest::header::USER_AGENT;
use reqwest::{Client, Url};
use serde_json::Value; use serde_json::Value;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::io::{BufReader, Read}; use std::io::{BufReader, Cursor, Read};
use std::thread::sleep; use std::thread::sleep;
use std::time::Duration; use std::time::Duration;
use url::{form_urlencoded, Url};
pub struct WikibaseLoader<R: Repository + Copy> { pub struct WikibaseLoader<R: Repository + Copy> {
repository: R, repository: R,
api_url: Url, api_url: Url,
entity_data_url: Url, entity_data_url: Url,
client: Client, client: H1Client,
namespaces: Vec<u32>, namespaces: Vec<u32>,
slot: Option<String>, slot: Option<String>,
frequency: Duration, frequency: Duration,
@ -32,10 +36,9 @@ impl<R: Repository + Copy> WikibaseLoader<R> {
) -> Result<Self> { ) -> Result<Self> {
Ok(Self { Ok(Self {
repository, repository,
api_url: Url::parse(api_url).map_err(Error::wrap)?, api_url: Url::parse(api_url)?,
entity_data_url: Url::parse(&(pages_base_url.to_owned() + "Special:EntityData")) entity_data_url: Url::parse(&(pages_base_url.to_owned() + "Special:EntityData"))?,
.map_err(Error::wrap)?, client: H1Client::new(),
client: Client::new(),
namespaces: namespaces.to_vec(), namespaces: namespaces.to_vec(),
slot: slot.map(|t| t.to_owned()), slot: slot.map(|t| t.to_owned()),
start: Utc::now(), start: Utc::now(),
@ -59,6 +62,7 @@ impl<R: Repository + Copy> WikibaseLoader<R> {
parameters.insert("action".to_owned(), "query".to_owned()); parameters.insert("action".to_owned(), "query".to_owned());
parameters.insert("list".to_owned(), "allpages".to_owned()); parameters.insert("list".to_owned(), "allpages".to_owned());
parameters.insert("apnamespace".to_owned(), namespace.to_string()); parameters.insert("apnamespace".to_owned(), namespace.to_string());
parameters.insert("aplimit".to_owned(), "50".to_owned());
self.api_get_with_continue(parameters, |results| { self.api_get_with_continue(parameters, |results| {
println!("*"); println!("*");
@ -81,7 +85,7 @@ impl<R: Repository + Copy> WikibaseLoader<R> {
Ok(data) => { Ok(data) => {
self.load_entity_data( self.load_entity_data(
&(self.entity_data_url.to_string() + "/" + id), &(self.entity_data_url.to_string() + "/" + id),
data, Cursor::new(data),
)?; )?;
} }
Err(e) => eprintln!("Error while retrieving data for entity {}: {}", id, e), Err(e) => eprintln!("Error while retrieving data for entity {}: {}", id, e),
@ -127,7 +131,7 @@ impl<R: Repository + Copy> WikibaseLoader<R> {
} }
parameters.insert("rcend".to_owned(), start.to_rfc2822()); parameters.insert("rcend".to_owned(), start.to_rfc2822());
parameters.insert("rcprop".to_owned(), "title|ids".to_owned()); parameters.insert("rcprop".to_owned(), "title|ids".to_owned());
parameters.insert("limit".to_owned(), "50".to_owned()); parameters.insert("rclimit".to_owned(), "50".to_owned());
self.api_get_with_continue(parameters, |results| { self.api_get_with_continue(parameters, |results| {
for change in results for change in results
@ -155,7 +159,10 @@ impl<R: Repository + Copy> WikibaseLoader<R> {
match self.get_entity_data(&id) { match self.get_entity_data(&id) {
Ok(data) => { Ok(data) => {
self.load_entity_data(&format!("{}/{}", self.entity_data_url, id), data)?; self.load_entity_data(
&format!("{}/{}", self.entity_data_url, id),
Cursor::new(data),
)?;
} }
Err(e) => eprintln!("Error while retrieving data for entity {}: {}", id, e), Err(e) => eprintln!("Error while retrieving data for entity {}: {}", id, e),
} }
@ -186,29 +193,39 @@ impl<R: Repository + Copy> WikibaseLoader<R> {
fn api_get(&self, parameters: &mut HashMap<String, String>) -> Result<Value> { fn api_get(&self, parameters: &mut HashMap<String, String>) -> Result<Value> {
parameters.insert("format".to_owned(), "json".to_owned()); parameters.insert("format".to_owned(), "json".to_owned());
Ok(self Ok(serde_json::from_slice(
.client &self.get_request(&self.api_url, parameters)?,
.get(self.api_url.clone()) )?)
.query(parameters)
.header(USER_AGENT, SERVER)
.send()
.map_err(Error::wrap)?
.error_for_status()
.map_err(Error::wrap)?
.json()
.map_err(Error::wrap)?)
} }
fn get_entity_data(&self, id: &str) -> Result<impl Read> { fn get_entity_data(&self, id: &str) -> Result<Vec<u8>> {
Ok(self Ok(self.get_request(
.client &self.entity_data_url,
.get(self.entity_data_url.clone()) [("id", id), ("format", "nt"), ("flavor", "dump")]
.query(&[("id", id), ("format", "nt"), ("flavor", "dump")]) .iter()
.header(USER_AGENT, SERVER) .cloned(),
.send() )?)
.map_err(Error::wrap)? }
.error_for_status()
.map_err(Error::wrap)?) fn get_request<K: AsRef<str>, V: AsRef<str>>(
&self,
url: &Url,
params: impl IntoIterator<Item = (K, V)>,
) -> Result<Vec<u8>> {
let mut query_serializer = form_urlencoded::Serializer::new(String::new());
for (k, v) in params {
query_serializer.append_pair(k.as_ref(), v.as_ref());
}
let url = url.join(&("?".to_owned() + &query_serializer.finish()))?;
let mut request = Request::new(Method::Get, url);
request.append_header("user-agent", SERVER)?;
let response = self.client.send(request);
block_on(async {
let mut response = response.await?;
let mut buffer = Vec::new();
response.read_to_end(&mut buffer).await?;
Ok(buffer)
})
} }
fn load_entity_data(&self, uri: &str, data: impl Read) -> Result<()> { fn load_entity_data(&self, uri: &str, data: impl Read) -> Result<()> {
@ -217,7 +234,7 @@ impl<R: Repository + Copy> WikibaseLoader<R> {
connection.transaction(|transaction| { connection.transaction(|transaction| {
let to_remove = connection let to_remove = connection
.quads_for_pattern(None, None, None, Some(Some(&graph_name))) .quads_for_pattern(None, None, None, Some(Some(&graph_name)))
.collect::<Result<Vec<_>>>()?; .collect::<oxigraph::Result<Vec<_>>>()?;
for q in to_remove { for q in to_remove {
transaction.remove(&q)?; transaction.remove(&q)?;
} }

@ -11,18 +11,20 @@
use crate::loader::WikibaseLoader; use crate::loader::WikibaseLoader;
use argh::FromArgs; use argh::FromArgs;
use async_std::future::Future;
use async_std::net::{TcpListener, TcpStream};
use async_std::prelude::*;
use async_std::sync::Arc;
use async_std::task::{spawn, spawn_blocking};
use http_types::headers::HeaderName;
use http_types::{headers, Body, Error, Method, Mime, Request, Response, Result, StatusCode};
use oxigraph::sparql::{PreparedQuery, QueryOptions, QueryResult, QueryResultSyntax}; use oxigraph::sparql::{PreparedQuery, QueryOptions, QueryResult, QueryResultSyntax};
use oxigraph::{ use oxigraph::{
FileSyntax, GraphSyntax, MemoryRepository, Repository, RepositoryConnection, RocksDbRepository, FileSyntax, GraphSyntax, MemoryRepository, Repository, RepositoryConnection, RocksDbRepository,
}; };
use rouille::input::priority_header_preferred;
use rouille::url::form_urlencoded;
use rouille::{content_encoding, start_server, Request, Response};
use std::io::Read;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc;
use std::thread;
use std::time::Duration; use std::time::Duration;
use url::form_urlencoded;
mod loader; mod loader;
@ -57,23 +59,22 @@ struct Args {
slot: Option<String>, slot: Option<String>,
} }
pub fn main() { #[async_std::main]
pub async fn main() -> Result<()> {
let args: Args = argh::from_env(); let args: Args = argh::from_env();
let file = args.file.clone(); let file = args.file.clone();
if let Some(file) = file { if let Some(file) = file {
main_with_dataset(Arc::new(RocksDbRepository::open(file).unwrap()), args) main_with_dataset(Arc::new(RocksDbRepository::open(file)?), args).await
} else { } else {
main_with_dataset(Arc::new(MemoryRepository::default()), args) main_with_dataset(Arc::new(MemoryRepository::default()), args).await
} }
} }
fn main_with_dataset<R: Send + Sync + 'static>(repository: Arc<R>, args: Args) async fn main_with_dataset<R: Send + Sync + 'static>(repository: Arc<R>, args: Args) -> Result<()>
where where
for<'a> &'a R: Repository, for<'a> &'a R: Repository,
{ {
println!("Listening for requests at http://{}", &args.bind);
let repo = repository.clone(); let repo = repository.clone();
let mediawiki_api = args.mediawiki_api.clone(); let mediawiki_api = args.mediawiki_api.clone();
let mediawiki_base_url = args.mediawiki_base_url.clone(); let mediawiki_base_url = args.mediawiki_base_url.clone();
@ -92,7 +93,7 @@ where
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let slot = args.slot.clone(); let slot = args.slot.clone();
thread::spawn(move || { spawn_blocking(move || {
let mut loader = WikibaseLoader::new( let mut loader = WikibaseLoader::new(
repo.as_ref(), repo.as_ref(),
&mediawiki_api, &mediawiki_api,
@ -106,131 +107,216 @@ where
loader.update_loop(); loader.update_loop();
}); });
start_server(args.bind, move |request| { println!("Listening for requests at http://{}", &args.bind);
content_encoding::apply(
request, http_server(args.bind, move |request| {
handle_request(request, repository.connection().unwrap()), handle_request(request, Arc::clone(&repository))
)
.with_unique_header("Server", SERVER)
}) })
.await
} }
fn handle_request<R: RepositoryConnection>(request: &Request, connection: R) -> Response { async fn handle_request<R: Send + Sync + 'static>(
match (request.url().as_str(), request.method()) { request: Request,
("/query", "GET") => evaluate_urlencoded_sparql_query( repository: Arc<R>,
connection, ) -> Result<Response>
request.raw_query_string().as_bytes(), where
request, for<'a> &'a R: Repository,
), {
("/query", "POST") => { let mut response = match (request.url().path(), request.method()) {
if let Some(body) = request.data() { ("/query", Method::Get) => {
if let Some(content_type) = request.header("Content-Type") { evaluate_urlencoded_sparql_query(
if content_type.starts_with("application/sparql-query") { repository,
let mut buffer = String::default(); request.url().query().unwrap_or("").as_bytes().to_vec(),
body.take(MAX_SPARQL_BODY_SIZE) request,
.read_to_string(&mut buffer) )
.unwrap(); .await?
evaluate_sparql_query(connection, &buffer, request) }
} else if content_type.starts_with("application/x-www-form-urlencoded") { ("/query", Method::Post) => {
let mut buffer = Vec::default(); if let Some(content_type) = request.content_type() {
body.take(MAX_SPARQL_BODY_SIZE) if essence(&content_type) == "application/sparql-query" {
.read_to_end(&mut buffer) let mut buffer = String::new();
.unwrap(); let mut request = request;
evaluate_urlencoded_sparql_query(connection, &buffer, request) request
} else { .take_body()
Response::text(format!( .take(MAX_SPARQL_BODY_SIZE)
"No supported content Content-Type given: {}", .read_to_string(&mut buffer)
content_type .await?;
)) evaluate_sparql_query(repository, buffer, request).await?
.with_status_code(415) } else if essence(&content_type) == "application/x-www-form-urlencoded" {
} let mut buffer = Vec::new();
let mut request = request;
request
.take_body()
.take(MAX_SPARQL_BODY_SIZE)
.read_to_end(&mut buffer)
.await?;
evaluate_urlencoded_sparql_query(repository, buffer, request).await?
} else { } else {
Response::text("No Content-Type given").with_status_code(400) simple_response(
StatusCode::UnsupportedMediaType,
format!("No supported Content-Type given: {}", content_type),
)
} }
} else { } else {
Response::text("No content given").with_status_code(400) simple_response(StatusCode::BadRequest, "No Content-Type given")
} }
} }
_ => Response::empty_404(), _ => Response::new(StatusCode::NotFound),
} };
response.append_header("Server", SERVER)?;
Ok(response)
} }
fn evaluate_urlencoded_sparql_query<R: RepositoryConnection>( /// TODO: bad hack to overcome http_types limitations
connection: R, fn essence(mime: &Mime) -> &str {
encoded: &[u8], mime.essence().split(';').next().unwrap_or("")
request: &Request, }
) -> Response {
if let Some((_, query)) = form_urlencoded::parse(encoded).find(|(k, _)| k == "query") { fn simple_response(status: StatusCode, body: impl Into<Body>) -> Response {
evaluate_sparql_query(connection, &query, request) let mut response = Response::new(status);
response.set_body(body);
response
}
async fn evaluate_urlencoded_sparql_query<R: Send + Sync + 'static>(
repository: Arc<R>,
encoded: Vec<u8>,
request: Request,
) -> Result<Response>
where
for<'a> &'a R: Repository,
{
if let Some((_, query)) = form_urlencoded::parse(&encoded).find(|(k, _)| k == "query") {
evaluate_sparql_query(repository, query.to_string(), request).await
} else { } else {
Response::text("You should set the 'query' parameter").with_status_code(400) Ok(simple_response(
StatusCode::BadRequest,
"You should set the 'query' parameter",
))
} }
} }
fn evaluate_sparql_query<R: RepositoryConnection>( async fn evaluate_sparql_query<R: Send + Sync + 'static>(
connection: R, repository: Arc<R>,
query: &str, query: String,
request: &Request, request: Request,
) -> Response { ) -> Result<Response>
//TODO: stream where
match connection.prepare_query(query, QueryOptions::default().with_default_graph_as_union()) { for<'a> &'a R: Repository,
Ok(query) => { {
let results = query.exec().unwrap(); spawn_blocking(move || {
if let QueryResult::Graph(_) = results { //TODO: stream
let supported_formats = [ let query = repository
.connection()?
.prepare_query(&query, QueryOptions::default())
.map_err(|e| {
let mut e = Error::from(e);
e.set_status(StatusCode::BadRequest);
e
})?;
let results = query.exec()?;
if let QueryResult::Graph(_) = results {
let format = content_negotiation(
request,
&[
GraphSyntax::NTriples.media_type(), GraphSyntax::NTriples.media_type(),
GraphSyntax::Turtle.media_type(), GraphSyntax::Turtle.media_type(),
GraphSyntax::RdfXml.media_type(), GraphSyntax::RdfXml.media_type(),
]; ],
let format = if let Some(accept) = request.header("Accept") { )?;
if let Some(media_type) =
priority_header_preferred(accept, supported_formats.iter().cloned())
.and_then(|p| GraphSyntax::from_mime_type(supported_formats[p]))
{
media_type
} else {
return Response::text(format!(
"No supported Accept given: {}. Supported format: {:?}",
accept, supported_formats
))
.with_status_code(415);
}
} else {
GraphSyntax::NTriples
};
Response::from_data( let mut response = Response::from(results.write_graph(Vec::default(), format)?);
format.media_type(), response.insert_header(headers::CONTENT_TYPE, format.media_type())?;
results.write_graph(Vec::default(), format).unwrap(), Ok(response)
) } else {
} else { let format = content_negotiation(
let supported_formats = [ request,
&[
QueryResultSyntax::Xml.media_type(), QueryResultSyntax::Xml.media_type(),
QueryResultSyntax::Json.media_type(), QueryResultSyntax::Json.media_type(),
]; ],
let format = if let Some(accept) = request.header("Accept") { )?;
if let Some(media_type) = let mut response = Response::from(results.write(Vec::default(), format)?);
priority_header_preferred(accept, supported_formats.iter().cloned()) response.insert_header(headers::CONTENT_TYPE, format.media_type())?;
.and_then(|p| QueryResultSyntax::from_mime_type(supported_formats[p])) Ok(response)
{ }
media_type })
} else { .await
return Response::text(format!( }
"No supported Accept given: {}. Supported format: {:?}",
accept, supported_formats async fn http_server<
)) F: Clone + Send + Sync + 'static + Fn(Request) -> Fut,
.with_status_code(415); Fut: Send + Future<Output = Result<Response>>,
} >(
} else { host: String,
QueryResultSyntax::Json handle: F,
}; ) -> Result<()> {
async fn accept<F: Fn(Request) -> Fut, Fut: Future<Output = Result<Response>>>(
addr: String,
stream: TcpStream,
handle: F,
) -> Result<()> {
async_h1::accept(&addr, stream, |request| async {
Ok(match handle(request).await {
Ok(result) => result,
Err(error) => simple_response(error.status(), error.to_string()),
})
})
.await
}
Response::from_data( let listener = TcpListener::bind(&host).await?;
format.media_type(), let mut incoming = listener.incoming();
results.write(Vec::default(), format).unwrap(), while let Some(stream) = incoming.next().await {
) let stream = stream?.clone(); //TODO: clone stream?
let handle = handle.clone();
let addr = format!("http://{}", host);
spawn(async {
if let Err(err) = accept(addr, stream, handle).await {
eprintln!("{}", err);
};
});
}
Ok(())
}
fn content_negotiation<F: FileSyntax>(request: Request, supported: &[&str]) -> Result<F> {
let header = request
.header(&HeaderName::from_str("Accept").unwrap())
.and_then(|h| h.last())
.map(|h| h.as_str().trim())
.unwrap_or("");
let supported: Vec<Mime> = supported
.iter()
.map(|h| Mime::from_str(h).unwrap())
.collect();
let mut result = supported.first().unwrap();
let mut result_score = 0f32;
if !header.is_empty() {
for possible in header.split(',') {
let possible = Mime::from_str(possible.trim())?;
let score = if let Some(q) = possible.param("q") {
f32::from_str(q)?
} else {
1.
};
if score <= result_score {
continue;
}
for candidate in &supported {
if (possible.basetype() == candidate.basetype() || possible.basetype() == "*")
&& (possible.subtype() == candidate.subtype() || possible.subtype() == "*")
{
result = candidate;
result_score = score;
break;
}
} }
} }
Err(error) => Response::text(error.to_string()).with_status_code(400),
} }
F::from_mime_type(essence(result))
.ok_or_else(|| Error::from_str(StatusCode::InternalServerError, "Unknown mime type"))
} }

Loading…
Cancel
Save