Adds a basic Wikibase loader

pull/21/head
Tpt 5 years ago
parent 7ac14cd901
commit 1550acdc31
  1. 3
      Cargo.toml
  2. 28
      README.md
  3. 2
      lib/Cargo.toml
  4. 8
      lib/src/sparql/model.rs
  5. 2
      lib/src/syntax.rs
  6. 19
      wikibase/Cargo.toml
  7. 212
      wikibase/src/loader.rs
  8. 227
      wikibase/src/main.rs

@ -1,7 +1,8 @@
[workspace] [workspace]
members = [ members = [
"lib", "lib",
"server" "server",
"wikibase"
] ]
[profile.release] [profile.release]

@ -12,7 +12,10 @@ There is no released version yet.
Its goal is to provide a compliant, safe and fast graph database based on the [RocksDB](https://rocksdb.org/) key-value store. Its goal is to provide a compliant, safe and fast graph database based on the [RocksDB](https://rocksdb.org/) key-value store.
It is written in Rust. It is written in Rust.
The `lib` directory contains the database written as a Rust library and the `server` directory a stand-alone binary of a web server implementing the [SPARQL 1.1 Protocol](https://www.w3.org/TR/sparql11-protocol/). It is split into multiple parts:
* The `lib` directory contains the database written as a Rust library
* The `server` directory contains a stand-alone binary of a web server implementing the [SPARQL 1.1 Protocol](https://www.w3.org/TR/sparql11-protocol/).
* The `wikibase` directory contains a stand-alone binary of a web server able to synchronize with a [Wikibase instance](https://wikiba.se/).
Are currently implemented: Are currently implemented:
* [SPARQL 1.1 Query](https://www.w3.org/TR/sparql11-query/) except `FROM` and `FROM NAMED`. * [SPARQL 1.1 Query](https://www.w3.org/TR/sparql11-query/) except `FROM` and `FROM NAMED`.
@ -45,6 +48,29 @@ It provides the following routes:
Use `rudf_server --help` to see the possible options when starting the server. Use `rudf_server --help` to see the possible options when starting the server.
## Run the web server for Wikibase
### Build
You need to have [a recent stable version of Rust and Cargo installed](https://www.rust-lang.org/tools/install).
If it's done, executing `cargo build --release` in the root directory of this repository should compile the full server after having downloaded its dependencies.
It will create a fat binary in `target/release/rudf_wikibase`.
### Usage
To start a server that is synchronized with [test.wikidata.org](https://test.wikidata.org) you should run:
```bash
./rudf_wikibase --mediawiki_api=https://test.wikidata.org/w/api.php --mediawiki_base_url=https://test.wikidata.org/wiki/ --namespaces=0,120 --file=test.wikidata
```
It creates a SPARQL endpoint listening to `localhost:7878/query` that could be queried just like Blazegraph.
The configuration parameters are:
* `mediawiki_api` URL of the MediaWiki API to use
* `mediawiki_base_url` Base URL of MediaWiki pages like `https://test.wikidata.org/wiki/` for test.wikidata.org or `http://localhost/w/index.php?title=` for "vanilla" installations.
* `namespaces` The ids of the Wikibase namespaces to synchronize with, separated by `,`.
* `file` Path of where Rudf should store its data.
## License ## License

@ -1,6 +1,6 @@
[package] [package]
name = "rudf" name = "rudf"
version = "0.0.1" version = "0.1.0"
authors = ["Tpt <thomas@pellissier-tanon.fr>"] authors = ["Tpt <thomas@pellissier-tanon.fr>"]
license = "MIT/Apache-2.0" license = "MIT/Apache-2.0"
readme = "../README.md" readme = "../README.md"

@ -99,8 +99,12 @@ impl FileSyntax for QueryResultSyntax {
fn from_mime_type(media_type: &str) -> Option<Self> { fn from_mime_type(media_type: &str) -> Option<Self> {
if let Some(base_type) = media_type.split(';').next() { if let Some(base_type) = media_type.split(';').next() {
match base_type { match base_type {
"application/sparql-results+xml" => Some(QueryResultSyntax::Xml), "application/xml" | "application/sparql-results+xml" => {
"application/sparql-results+json" => Some(QueryResultSyntax::Json), Some(QueryResultSyntax::Xml)
}
"application/json" | "application/sparql-results+json" => {
Some(QueryResultSyntax::Json)
}
_ => None, _ => None,
} }
} else { } else {

@ -62,7 +62,7 @@ impl FileSyntax for GraphSyntax {
match base_type { match base_type {
"application/n-triples" => Some(GraphSyntax::NTriples), "application/n-triples" => Some(GraphSyntax::NTriples),
"text/turtle" => Some(GraphSyntax::Turtle), "text/turtle" => Some(GraphSyntax::Turtle),
"application/rdf+xml" => Some(GraphSyntax::RdfXml), "application/xml" | "application/rdf+xml" => Some(GraphSyntax::RdfXml),
_ => None, _ => None,
} }
} else { } else {

@ -0,0 +1,19 @@
[package]
name = "rudf_wikibase"
version = "0.1.0"
authors = ["Tpt <thomas@pellissier-tanon.fr>"]
license = "MIT/Apache-2.0"
readme = "../README.md"
repository = "https://github.com/Tpt/rudf"
description = """
Rudf based SPARQL server for Wikibase instances
"""
edition = "2018"
[dependencies]
rudf = {path = "../lib", features = ["rocksdb"] }
clap = "2"
rouille = "3"
reqwest = "0.9"
serde_json = "1"
chrono = "0.4"

@ -0,0 +1,212 @@
use crate::SERVER;
use chrono::{DateTime, Utc};
use reqwest::header::USER_AGENT;
use reqwest::{Client, Url};
use rudf::model::NamedNode;
use rudf::{GraphSyntax, Repository, RepositoryConnection, Result};
use serde_json::Value;
use std::collections::{HashMap, HashSet};
use std::io::{BufReader, Read};
use std::thread::sleep;
use std::time::Duration;
pub struct WikibaseLoader<R: Repository + Copy> {
repository: R,
api_url: Url,
entity_data_url: Url,
client: Client,
namespaces: Vec<u32>,
frequency: Duration,
start: DateTime<Utc>,
}
impl<R: Repository + Copy> WikibaseLoader<R> {
pub fn new(
repository: R,
api_url: &str,
pages_base_url: &str,
namespaces: &[u32],
frequency: Duration,
) -> Result<Self> {
Ok(Self {
repository,
api_url: Url::parse(api_url)?,
entity_data_url: Url::parse(&(pages_base_url.to_owned() + "Special:EntityData"))?,
client: Client::new(),
namespaces: namespaces.to_vec(),
start: Utc::now(),
frequency,
})
}
pub fn initial_loading(&mut self) -> Result<()> {
println!("Initial loading ");
self.start = Utc::now();
for namespace in &self.namespaces {
let mut parameters = HashMap::default();
parameters.insert("action".to_owned(), "query".to_owned());
parameters.insert("list".to_owned(), "allpages".to_owned());
parameters.insert("apnamespace".to_owned(), namespace.to_string());
self.api_get_with_continue(parameters, |results| {
println!("*");
for page in results
.as_object()
.unwrap()
.get("query")
.unwrap()
.get("allpages")
.unwrap()
.as_array()
.unwrap()
{
let desc = page.as_object().unwrap();
let title = desc.get("title").unwrap().as_str().unwrap();
let id = title.split(':').last().unwrap_or(title);
match self.get_entity_data(id) {
Ok(data) => {
self.load_entity_data(
&(self.entity_data_url.to_string() + "/" + id),
data,
)?;
}
Err(e) => eprintln!("Error while retrieving data for entity {}: {}", id, e),
}
}
Ok(())
})?;
}
Ok(())
}
pub fn update_loop(&mut self) {
loop {
let new_start = Utc::now();
if let Err(e) = self.refresh_step(self.start) {
eprintln!("WikibaseLoader error: {}", e);
}
self.start = new_start;
let elapsed = (Utc::now() - self.start).to_std().unwrap();
if elapsed < self.frequency {
sleep(self.frequency - elapsed);
}
}
}
fn refresh_step(&self, start: DateTime<Utc>) -> Result<()> {
let mut seen: HashSet<String> = HashSet::default();
let mut parameters = HashMap::default();
parameters.insert("action".to_owned(), "query".to_owned());
parameters.insert("list".to_owned(), "recentchanges".to_owned());
parameters.insert(
"rcnamespace".to_owned(),
self.namespaces
.iter()
.map(|ns| ns.to_string())
.collect::<Vec<_>>()
.join("|"),
);
parameters.insert("rcend".to_owned(), start.to_rfc2822());
parameters.insert("rcprop".to_owned(), "title|ids".to_owned());
parameters.insert("limit".to_owned(), "50".to_owned());
self.api_get_with_continue(parameters, |results| {
for change in results
.as_object()
.unwrap()
.get("query")
.unwrap()
.get("recentchanges")
.unwrap()
.as_array()
.unwrap()
{
let desc = change.as_object().unwrap();
let title = desc.get("title").unwrap().as_str().unwrap();
let id = title.split(':').last().unwrap_or(title);
if seen.contains(id) {
continue;
}
seen.insert(id.to_owned());
match self.get_entity_data(id) {
Ok(data) => {
self.load_entity_data(
&(self.entity_data_url.to_string() + "/" + id),
data,
)?;
}
Err(e) => eprintln!("Error while retrieving data for entity {}: {}", id, e),
}
}
Ok(())
})
}
fn api_get_with_continue(
&self,
mut parameters: HashMap<String, String>,
mut on_results: impl FnMut(&Value) -> Result<()>,
) -> Result<()> {
loop {
let results = self.api_get(&mut parameters)?;
on_results(&results)?;
if let Some(cont) = results.get("continue") {
for (k, v) in cont.as_object().unwrap().iter() {
parameters.insert(k.to_owned(), v.as_str().unwrap().to_owned());
}
} else {
return Ok(());
}
}
}
fn api_get(&self, parameters: &mut HashMap<String, String>) -> Result<Value> {
parameters.insert("format".to_owned(), "json".to_owned());
Ok(self
.client
.get(self.api_url.clone())
.query(parameters)
.header(USER_AGENT, SERVER)
.send()?
.error_for_status()?
.json()?)
}
fn get_entity_data(&self, id: &str) -> Result<impl Read> {
Ok(self
.client
.get(self.entity_data_url.clone())
.query(&[("id", id), ("format", "nt"), ("flavor", "dump")])
.header(USER_AGENT, SERVER)
.send()?
.error_for_status()?)
}
fn load_entity_data(&self, uri: &str, data: impl Read) -> Result<()> {
let mut connection = self.repository.connection()?;
let graph_name = NamedNode::parse(uri)?.into();
let to_remove = connection
.quads_for_pattern(None, None, None, Some(Some(&graph_name)))
.collect::<Result<Vec<_>>>()?;
for q in to_remove {
connection.remove(&q)?;
}
connection.load_graph(
BufReader::new(data),
GraphSyntax::NTriples,
Some(&NamedNode::parse(uri)?.into()),
None,
)
}
}

@ -0,0 +1,227 @@
use crate::loader::WikibaseLoader;
use clap::App;
use clap::Arg;
use clap::ArgMatches;
use rouille::input::priority_header_preferred;
use rouille::url::form_urlencoded;
use rouille::{content_encoding, start_server, Request, Response};
use rudf::sparql::{PreparedQuery, QueryOptions, QueryResult, QueryResultSyntax};
use rudf::{
FileSyntax, GraphSyntax, MemoryRepository, Repository, RepositoryConnection, RocksDbRepository,
};
use std::io::Read;
use std::str::FromStr;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
mod loader;
const MAX_SPARQL_BODY_SIZE: u64 = 1_048_576;
const SERVER: &str = concat!("Rudf/", env!("CARGO_PKG_VERSION"));
pub fn main() {
let matches = App::new("Rudf SPARQL server")
.arg(
Arg::with_name("bind")
.long("bind")
.short("b")
.help("Specify a server socket to bind using the format $(HOST):$(PORT)")
.default_value("localhost:7878")
.takes_value(true),
)
.arg(
Arg::with_name("file")
.long("file")
.short("f")
.help("Directory in which persist the data. By default data are kept in memory.")
.takes_value(true),
)
.arg(
Arg::with_name("mediawiki_api")
.long("mediawiki_api")
.help("URL of the MediaWiki API like https://www.wikidata.org/w/api.php.")
.required(true)
.takes_value(true),
)
.arg(
Arg::with_name("mediawiki_base_url")
.long("mediawiki_base_url")
.help("Base URL of MediaWiki like https://www.wikidata.org/wiki/")
.required(true)
.takes_value(true),
)
.arg(
Arg::with_name("namespaces")
.long("namespaces")
.help("Namespaces ids, to load in Blazegraph like \"0,120\"")
.required(true)
.takes_value(true),
)
.get_matches();
let file = matches.value_of("file").map(|v| v.to_string());
if let Some(file) = file {
main_with_dataset(Arc::new(RocksDbRepository::open(file).unwrap()), &matches)
} else {
main_with_dataset(Arc::new(MemoryRepository::default()), &matches)
}
}
fn main_with_dataset<R: Send + Sync + 'static>(repository: Arc<R>, matches: &ArgMatches)
where
for<'a> &'a R: Repository,
{
let addr = matches.value_of("bind").unwrap().to_owned();
println!("Listening for requests at http://{}", &addr);
let repo = repository.clone();
let mediawiki_api = matches.value_of("mediawiki_api").unwrap().to_owned();
let mediawiki_base_url = matches.value_of("mediawiki_base_url").unwrap().to_owned();
let namespaces = matches
.value_of("namespaces")
.unwrap()
.split(',')
.map(|t| u32::from_str(t.trim()).unwrap())
.collect::<Vec<_>>();
thread::spawn(move || {
let mut loader = WikibaseLoader::new(
repo.as_ref(),
&mediawiki_api,
&mediawiki_base_url,
&namespaces,
Duration::new(10, 0),
)
.unwrap();
loader.initial_loading().unwrap();
loader.update_loop();
});
start_server(addr.to_string(), move |request| {
content_encoding::apply(
request,
handle_request(request, repository.connection().unwrap()),
)
.with_unique_header("Server", SERVER)
})
}
fn handle_request<R: RepositoryConnection>(request: &Request, connection: R) -> Response {
match (request.url().as_str(), request.method()) {
("/query", "GET") => evaluate_urlencoded_sparql_query(
connection,
request.raw_query_string().as_bytes(),
request,
),
("/query", "POST") => {
if let Some(body) = request.data() {
if let Some(content_type) = request.header("Content-Type") {
if content_type.starts_with("application/sparql-query") {
let mut buffer = String::default();
body.take(MAX_SPARQL_BODY_SIZE)
.read_to_string(&mut buffer)
.unwrap();
evaluate_sparql_query(connection, &buffer, request)
} else if content_type.starts_with("application/x-www-form-urlencoded") {
let mut buffer = Vec::default();
body.take(MAX_SPARQL_BODY_SIZE)
.read_to_end(&mut buffer)
.unwrap();
evaluate_urlencoded_sparql_query(connection, &buffer, request)
} else {
Response::text(format!(
"No supported content Content-Type given: {}",
content_type
))
.with_status_code(415)
}
} else {
Response::text("No Content-Type given").with_status_code(400)
}
} else {
Response::text("No content given").with_status_code(400)
}
}
_ => Response::empty_404(),
}
}
fn evaluate_urlencoded_sparql_query<R: RepositoryConnection>(
connection: R,
encoded: &[u8],
request: &Request,
) -> Response {
if let Some((_, query)) = form_urlencoded::parse(encoded).find(|(k, _)| k == "query") {
evaluate_sparql_query(connection, &query, request)
} else {
Response::text("You should set the 'query' parameter").with_status_code(400)
}
}
fn evaluate_sparql_query<R: RepositoryConnection>(
connection: R,
query: &str,
request: &Request,
) -> Response {
//TODO: stream
match connection.prepare_query(query, QueryOptions::default().with_default_graph_as_union()) {
Ok(query) => {
let results = query.exec().unwrap();
if let QueryResult::Graph(_) = results {
let supported_formats = [
GraphSyntax::NTriples.media_type(),
GraphSyntax::Turtle.media_type(),
GraphSyntax::RdfXml.media_type(),
];
let format = if let Some(accept) = request.header("Accept") {
if let Some(media_type) =
priority_header_preferred(accept, supported_formats.iter().cloned())
.and_then(|p| GraphSyntax::from_mime_type(supported_formats[p]))
{
media_type
} else {
return Response::text(format!(
"No supported Accept given: {}. Supported format: {:?}",
accept, supported_formats
))
.with_status_code(415);
}
} else {
GraphSyntax::NTriples
};
Response::from_data(
format.media_type(),
results.write_graph(Vec::default(), format).unwrap(),
)
} else {
let supported_formats = [
QueryResultSyntax::Xml.media_type(),
QueryResultSyntax::Json.media_type(),
];
let format = if let Some(accept) = request.header("Accept") {
if let Some(media_type) =
priority_header_preferred(accept, supported_formats.iter().cloned())
.and_then(|p| QueryResultSyntax::from_mime_type(supported_formats[p]))
{
media_type
} else {
return Response::text(format!(
"No supported Accept given: {}. Supported format: {:?}",
accept, supported_formats
))
.with_status_code(415);
}
} else {
QueryResultSyntax::Json
};
Response::from_data(
format.media_type(),
results.write(Vec::default(), format).unwrap(),
)
}
}
Err(error) => Response::text(error.to_string()).with_status_code(400),
}
}
Loading…
Cancel
Save