From 1550acdc3173ca53d770ac0f5e387f47d39565e6 Mon Sep 17 00:00:00 2001 From: Tpt Date: Tue, 15 Oct 2019 16:20:39 +0200 Subject: [PATCH] Adds a basic Wikibase loader --- Cargo.toml | 3 +- README.md | 28 ++++- lib/Cargo.toml | 2 +- lib/src/sparql/model.rs | 8 +- lib/src/syntax.rs | 2 +- wikibase/Cargo.toml | 19 ++++ wikibase/src/loader.rs | 212 +++++++++++++++++++++++++++++++++++++ wikibase/src/main.rs | 227 ++++++++++++++++++++++++++++++++++++++++ 8 files changed, 495 insertions(+), 6 deletions(-) create mode 100644 wikibase/Cargo.toml create mode 100644 wikibase/src/loader.rs create mode 100644 wikibase/src/main.rs diff --git a/Cargo.toml b/Cargo.toml index 72fcf1ed..89367ea6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,8 @@ [workspace] members = [ "lib", - "server" + "server", + "wikibase" ] [profile.release] diff --git a/README.md b/README.md index 1347edb0..f18de1b4 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,10 @@ There is no released version yet. Its goal is to provide a compliant, safe and fast graph database based on the [RocksDB](https://rocksdb.org/) key-value store. It is written in Rust. -The `lib` directory contains the database written as a Rust library and the `server` directory a stand-alone binary of a web server implementing the [SPARQL 1.1 Protocol](https://www.w3.org/TR/sparql11-protocol/). +It is split into multiple parts: +* The `lib` directory contains the database written as a Rust library +* The `server` directory contains a stand-alone binary of a web server implementing the [SPARQL 1.1 Protocol](https://www.w3.org/TR/sparql11-protocol/). +* The `wikibase` directory contains a stand-alone binary of a web server able to synchronize with a [Wikibase instance](https://wikiba.se/). Are currently implemented: * [SPARQL 1.1 Query](https://www.w3.org/TR/sparql11-query/) except `FROM` and `FROM NAMED`. @@ -45,6 +48,29 @@ It provides the following routes: Use `rudf_server --help` to see the possible options when starting the server. +## Run the web server for Wikibase + +### Build +You need to have [a recent stable version of Rust and Cargo installed](https://www.rust-lang.org/tools/install). + +If it's done, executing `cargo build --release` in the root directory of this repository should compile the full server after having downloaded its dependencies. +It will create a fat binary in `target/release/rudf_wikibase`. + +### Usage + +To start a server that is synchronized with [test.wikidata.org](https://test.wikidata.org) you should run: +```bash +./rudf_wikibase --mediawiki_api=https://test.wikidata.org/w/api.php --mediawiki_base_url=https://test.wikidata.org/wiki/ --namespaces=0,120 --file=test.wikidata +``` + +It creates a SPARQL endpoint listening to `localhost:7878/query` that could be queried just like Blazegraph. + +The configuration parameters are: +* `mediawiki_api` URL of the MediaWiki API to use +* `mediawiki_base_url` Base URL of MediaWiki pages like `https://test.wikidata.org/wiki/` for test.wikidata.org or `http://localhost/w/index.php?title=` for "vanilla" installations. +* `namespaces` The ids of the Wikibase namespaces to synchronize with, separated by `,`. +* `file` Path of where Rudf should store its data. + ## License diff --git a/lib/Cargo.toml b/lib/Cargo.toml index eb751db6..cf44eb95 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rudf" -version = "0.0.1" +version = "0.1.0" authors = ["Tpt "] license = "MIT/Apache-2.0" readme = "../README.md" diff --git a/lib/src/sparql/model.rs b/lib/src/sparql/model.rs index 81c91f98..d38f0f39 100644 --- a/lib/src/sparql/model.rs +++ b/lib/src/sparql/model.rs @@ -99,8 +99,12 @@ impl FileSyntax for QueryResultSyntax { fn from_mime_type(media_type: &str) -> Option { if let Some(base_type) = media_type.split(';').next() { match base_type { - "application/sparql-results+xml" => Some(QueryResultSyntax::Xml), - "application/sparql-results+json" => Some(QueryResultSyntax::Json), + "application/xml" | "application/sparql-results+xml" => { + Some(QueryResultSyntax::Xml) + } + "application/json" | "application/sparql-results+json" => { + Some(QueryResultSyntax::Json) + } _ => None, } } else { diff --git a/lib/src/syntax.rs b/lib/src/syntax.rs index 2e359449..2efb7ba3 100644 --- a/lib/src/syntax.rs +++ b/lib/src/syntax.rs @@ -62,7 +62,7 @@ impl FileSyntax for GraphSyntax { match base_type { "application/n-triples" => Some(GraphSyntax::NTriples), "text/turtle" => Some(GraphSyntax::Turtle), - "application/rdf+xml" => Some(GraphSyntax::RdfXml), + "application/xml" | "application/rdf+xml" => Some(GraphSyntax::RdfXml), _ => None, } } else { diff --git a/wikibase/Cargo.toml b/wikibase/Cargo.toml new file mode 100644 index 00000000..6774ba40 --- /dev/null +++ b/wikibase/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "rudf_wikibase" +version = "0.1.0" +authors = ["Tpt "] +license = "MIT/Apache-2.0" +readme = "../README.md" +repository = "https://github.com/Tpt/rudf" +description = """ +Rudf based SPARQL server for Wikibase instances +""" +edition = "2018" + +[dependencies] +rudf = {path = "../lib", features = ["rocksdb"] } +clap = "2" +rouille = "3" +reqwest = "0.9" +serde_json = "1" +chrono = "0.4" diff --git a/wikibase/src/loader.rs b/wikibase/src/loader.rs new file mode 100644 index 00000000..bf7a0400 --- /dev/null +++ b/wikibase/src/loader.rs @@ -0,0 +1,212 @@ +use crate::SERVER; +use chrono::{DateTime, Utc}; +use reqwest::header::USER_AGENT; +use reqwest::{Client, Url}; +use rudf::model::NamedNode; +use rudf::{GraphSyntax, Repository, RepositoryConnection, Result}; +use serde_json::Value; +use std::collections::{HashMap, HashSet}; +use std::io::{BufReader, Read}; +use std::thread::sleep; +use std::time::Duration; + +pub struct WikibaseLoader { + repository: R, + api_url: Url, + entity_data_url: Url, + client: Client, + namespaces: Vec, + frequency: Duration, + start: DateTime, +} + +impl WikibaseLoader { + pub fn new( + repository: R, + api_url: &str, + pages_base_url: &str, + namespaces: &[u32], + frequency: Duration, + ) -> Result { + Ok(Self { + repository, + api_url: Url::parse(api_url)?, + entity_data_url: Url::parse(&(pages_base_url.to_owned() + "Special:EntityData"))?, + client: Client::new(), + namespaces: namespaces.to_vec(), + start: Utc::now(), + frequency, + }) + } + + pub fn initial_loading(&mut self) -> Result<()> { + println!("Initial loading "); + + self.start = Utc::now(); + + for namespace in &self.namespaces { + let mut parameters = HashMap::default(); + parameters.insert("action".to_owned(), "query".to_owned()); + parameters.insert("list".to_owned(), "allpages".to_owned()); + parameters.insert("apnamespace".to_owned(), namespace.to_string()); + + self.api_get_with_continue(parameters, |results| { + println!("*"); + for page in results + .as_object() + .unwrap() + .get("query") + .unwrap() + .get("allpages") + .unwrap() + .as_array() + .unwrap() + { + let desc = page.as_object().unwrap(); + let title = desc.get("title").unwrap().as_str().unwrap(); + + let id = title.split(':').last().unwrap_or(title); + + match self.get_entity_data(id) { + Ok(data) => { + self.load_entity_data( + &(self.entity_data_url.to_string() + "/" + id), + data, + )?; + } + Err(e) => eprintln!("Error while retrieving data for entity {}: {}", id, e), + } + } + Ok(()) + })?; + } + Ok(()) + } + + pub fn update_loop(&mut self) { + loop { + let new_start = Utc::now(); + if let Err(e) = self.refresh_step(self.start) { + eprintln!("WikibaseLoader error: {}", e); + } + self.start = new_start; + let elapsed = (Utc::now() - self.start).to_std().unwrap(); + if elapsed < self.frequency { + sleep(self.frequency - elapsed); + } + } + } + + fn refresh_step(&self, start: DateTime) -> Result<()> { + let mut seen: HashSet = HashSet::default(); + + let mut parameters = HashMap::default(); + parameters.insert("action".to_owned(), "query".to_owned()); + parameters.insert("list".to_owned(), "recentchanges".to_owned()); + parameters.insert( + "rcnamespace".to_owned(), + self.namespaces + .iter() + .map(|ns| ns.to_string()) + .collect::>() + .join("|"), + ); + parameters.insert("rcend".to_owned(), start.to_rfc2822()); + parameters.insert("rcprop".to_owned(), "title|ids".to_owned()); + parameters.insert("limit".to_owned(), "50".to_owned()); + + self.api_get_with_continue(parameters, |results| { + for change in results + .as_object() + .unwrap() + .get("query") + .unwrap() + .get("recentchanges") + .unwrap() + .as_array() + .unwrap() + { + let desc = change.as_object().unwrap(); + let title = desc.get("title").unwrap().as_str().unwrap(); + + let id = title.split(':').last().unwrap_or(title); + if seen.contains(id) { + continue; + } + seen.insert(id.to_owned()); + + match self.get_entity_data(id) { + Ok(data) => { + self.load_entity_data( + &(self.entity_data_url.to_string() + "/" + id), + data, + )?; + } + Err(e) => eprintln!("Error while retrieving data for entity {}: {}", id, e), + } + } + Ok(()) + }) + } + + fn api_get_with_continue( + &self, + mut parameters: HashMap, + mut on_results: impl FnMut(&Value) -> Result<()>, + ) -> Result<()> { + loop { + let results = self.api_get(&mut parameters)?; + on_results(&results)?; + + if let Some(cont) = results.get("continue") { + for (k, v) in cont.as_object().unwrap().iter() { + parameters.insert(k.to_owned(), v.as_str().unwrap().to_owned()); + } + } else { + return Ok(()); + } + } + } + + fn api_get(&self, parameters: &mut HashMap) -> Result { + parameters.insert("format".to_owned(), "json".to_owned()); + + Ok(self + .client + .get(self.api_url.clone()) + .query(parameters) + .header(USER_AGENT, SERVER) + .send()? + .error_for_status()? + .json()?) + } + + fn get_entity_data(&self, id: &str) -> Result { + Ok(self + .client + .get(self.entity_data_url.clone()) + .query(&[("id", id), ("format", "nt"), ("flavor", "dump")]) + .header(USER_AGENT, SERVER) + .send()? + .error_for_status()?) + } + + fn load_entity_data(&self, uri: &str, data: impl Read) -> Result<()> { + let mut connection = self.repository.connection()?; + let graph_name = NamedNode::parse(uri)?.into(); + + let to_remove = connection + .quads_for_pattern(None, None, None, Some(Some(&graph_name))) + .collect::>>()?; + for q in to_remove { + connection.remove(&q)?; + } + + connection.load_graph( + BufReader::new(data), + GraphSyntax::NTriples, + Some(&NamedNode::parse(uri)?.into()), + None, + ) + } +} diff --git a/wikibase/src/main.rs b/wikibase/src/main.rs new file mode 100644 index 00000000..02d4195f --- /dev/null +++ b/wikibase/src/main.rs @@ -0,0 +1,227 @@ +use crate::loader::WikibaseLoader; +use clap::App; +use clap::Arg; +use clap::ArgMatches; +use rouille::input::priority_header_preferred; +use rouille::url::form_urlencoded; +use rouille::{content_encoding, start_server, Request, Response}; +use rudf::sparql::{PreparedQuery, QueryOptions, QueryResult, QueryResultSyntax}; +use rudf::{ + FileSyntax, GraphSyntax, MemoryRepository, Repository, RepositoryConnection, RocksDbRepository, +}; +use std::io::Read; +use std::str::FromStr; +use std::sync::Arc; +use std::thread; +use std::time::Duration; + +mod loader; + +const MAX_SPARQL_BODY_SIZE: u64 = 1_048_576; +const SERVER: &str = concat!("Rudf/", env!("CARGO_PKG_VERSION")); + +pub fn main() { + let matches = App::new("Rudf SPARQL server") + .arg( + Arg::with_name("bind") + .long("bind") + .short("b") + .help("Specify a server socket to bind using the format $(HOST):$(PORT)") + .default_value("localhost:7878") + .takes_value(true), + ) + .arg( + Arg::with_name("file") + .long("file") + .short("f") + .help("Directory in which persist the data. By default data are kept in memory.") + .takes_value(true), + ) + .arg( + Arg::with_name("mediawiki_api") + .long("mediawiki_api") + .help("URL of the MediaWiki API like https://www.wikidata.org/w/api.php.") + .required(true) + .takes_value(true), + ) + .arg( + Arg::with_name("mediawiki_base_url") + .long("mediawiki_base_url") + .help("Base URL of MediaWiki like https://www.wikidata.org/wiki/") + .required(true) + .takes_value(true), + ) + .arg( + Arg::with_name("namespaces") + .long("namespaces") + .help("Namespaces ids, to load in Blazegraph like \"0,120\"") + .required(true) + .takes_value(true), + ) + .get_matches(); + + let file = matches.value_of("file").map(|v| v.to_string()); + if let Some(file) = file { + main_with_dataset(Arc::new(RocksDbRepository::open(file).unwrap()), &matches) + } else { + main_with_dataset(Arc::new(MemoryRepository::default()), &matches) + } +} + +fn main_with_dataset(repository: Arc, matches: &ArgMatches) +where + for<'a> &'a R: Repository, +{ + let addr = matches.value_of("bind").unwrap().to_owned(); + println!("Listening for requests at http://{}", &addr); + + let repo = repository.clone(); + let mediawiki_api = matches.value_of("mediawiki_api").unwrap().to_owned(); + let mediawiki_base_url = matches.value_of("mediawiki_base_url").unwrap().to_owned(); + let namespaces = matches + .value_of("namespaces") + .unwrap() + .split(',') + .map(|t| u32::from_str(t.trim()).unwrap()) + .collect::>(); + thread::spawn(move || { + let mut loader = WikibaseLoader::new( + repo.as_ref(), + &mediawiki_api, + &mediawiki_base_url, + &namespaces, + Duration::new(10, 0), + ) + .unwrap(); + loader.initial_loading().unwrap(); + loader.update_loop(); + }); + + start_server(addr.to_string(), move |request| { + content_encoding::apply( + request, + handle_request(request, repository.connection().unwrap()), + ) + .with_unique_header("Server", SERVER) + }) +} + +fn handle_request(request: &Request, connection: R) -> Response { + match (request.url().as_str(), request.method()) { + ("/query", "GET") => evaluate_urlencoded_sparql_query( + connection, + request.raw_query_string().as_bytes(), + request, + ), + ("/query", "POST") => { + if let Some(body) = request.data() { + if let Some(content_type) = request.header("Content-Type") { + if content_type.starts_with("application/sparql-query") { + let mut buffer = String::default(); + body.take(MAX_SPARQL_BODY_SIZE) + .read_to_string(&mut buffer) + .unwrap(); + evaluate_sparql_query(connection, &buffer, request) + } else if content_type.starts_with("application/x-www-form-urlencoded") { + let mut buffer = Vec::default(); + body.take(MAX_SPARQL_BODY_SIZE) + .read_to_end(&mut buffer) + .unwrap(); + evaluate_urlencoded_sparql_query(connection, &buffer, request) + } else { + Response::text(format!( + "No supported content Content-Type given: {}", + content_type + )) + .with_status_code(415) + } + } else { + Response::text("No Content-Type given").with_status_code(400) + } + } else { + Response::text("No content given").with_status_code(400) + } + } + _ => Response::empty_404(), + } +} + +fn evaluate_urlencoded_sparql_query( + connection: R, + encoded: &[u8], + request: &Request, +) -> Response { + if let Some((_, query)) = form_urlencoded::parse(encoded).find(|(k, _)| k == "query") { + evaluate_sparql_query(connection, &query, request) + } else { + Response::text("You should set the 'query' parameter").with_status_code(400) + } +} + +fn evaluate_sparql_query( + connection: R, + query: &str, + request: &Request, +) -> Response { + //TODO: stream + match connection.prepare_query(query, QueryOptions::default().with_default_graph_as_union()) { + Ok(query) => { + let results = query.exec().unwrap(); + if let QueryResult::Graph(_) = results { + let supported_formats = [ + GraphSyntax::NTriples.media_type(), + GraphSyntax::Turtle.media_type(), + GraphSyntax::RdfXml.media_type(), + ]; + let format = if let Some(accept) = request.header("Accept") { + if let Some(media_type) = + priority_header_preferred(accept, supported_formats.iter().cloned()) + .and_then(|p| GraphSyntax::from_mime_type(supported_formats[p])) + { + media_type + } else { + return Response::text(format!( + "No supported Accept given: {}. Supported format: {:?}", + accept, supported_formats + )) + .with_status_code(415); + } + } else { + GraphSyntax::NTriples + }; + + Response::from_data( + format.media_type(), + results.write_graph(Vec::default(), format).unwrap(), + ) + } else { + let supported_formats = [ + QueryResultSyntax::Xml.media_type(), + QueryResultSyntax::Json.media_type(), + ]; + let format = if let Some(accept) = request.header("Accept") { + if let Some(media_type) = + priority_header_preferred(accept, supported_formats.iter().cloned()) + .and_then(|p| QueryResultSyntax::from_mime_type(supported_formats[p])) + { + media_type + } else { + return Response::text(format!( + "No supported Accept given: {}. Supported format: {:?}", + accept, supported_formats + )) + .with_status_code(415); + } + } else { + QueryResultSyntax::Json + }; + + Response::from_data( + format.media_type(), + results.write(Vec::default(), format).unwrap(), + ) + } + } + Err(error) => Response::text(error.to_string()).with_status_code(400), + } +}