Allows rudf_server to be persistent with RocksDB

pull/10/head
Tpt 6 years ago
parent 0290a2ce4b
commit 4bbe193736
  1. 4
      server/Cargo.toml
  2. 61
      server/src/main.rs

@ -10,7 +10,7 @@ Rudf based SPARQL server
""" """
[dependencies] [dependencies]
rudf = {path = "../lib"} rudf = {path = "../lib", features=["rocksdb"]}
gotham = "0.3" gotham = "0.3"
gotham_derive = "0.3" gotham_derive = "0.3"
hyper = "0.12" hyper = "0.12"
@ -20,4 +20,4 @@ serde_derive = "1"
mime = "0.3" mime = "0.3"
failure = "0.1" failure = "0.1"
url = "1" url = "1"
clap = "2" clap = "2"

@ -15,6 +15,7 @@ extern crate clap;
use clap::App; use clap::App;
use clap::Arg; use clap::Arg;
use clap::ArgMatches;
use futures::future; use futures::future;
use futures::Future; use futures::Future;
use futures::Stream; use futures::Stream;
@ -34,7 +35,6 @@ use hyper::Body;
use hyper::HeaderMap; use hyper::HeaderMap;
use hyper::Response; use hyper::Response;
use hyper::StatusCode; use hyper::StatusCode;
use rudf::model::Dataset;
use rudf::model::Graph; use rudf::model::Graph;
use rudf::rio::ntriples::read_ntriples; use rudf::rio::ntriples::read_ntriples;
use rudf::sparql::algebra::QueryResult; use rudf::sparql::algebra::QueryResult;
@ -43,7 +43,9 @@ use rudf::sparql::PreparedQuery;
use rudf::sparql::SparqlDataset; use rudf::sparql::SparqlDataset;
use rudf::store::MemoryDataset; use rudf::store::MemoryDataset;
use rudf::store::MemoryGraph; use rudf::store::MemoryGraph;
use rudf::store::RocksDbDataset;
use std::fs::File; use std::fs::File;
use std::panic::RefUnwindSafe;
use std::sync::Arc; use std::sync::Arc;
use url::form_urlencoded; use url::form_urlencoded;
@ -60,9 +62,26 @@ pub fn main() -> Result<(), failure::Error> {
.long("ntriples") .long("ntriples")
.help("Load a N-Triples file in the server at startup") .help("Load a N-Triples file in the server at startup")
.takes_value(true), .takes_value(true),
).arg(
Arg::with_name("file")
.long("file")
.short("f")
.help("File in which persist the dataset")
.takes_value(true),
).get_matches(); ).get_matches();
let dataset = MemoryDataset::default(); let file = matches.value_of("file").map(|v| v.to_string());
if let Some(file) = file {
main_with_dataset(Arc::new(RocksDbDataset::open(file)?), matches)
} else {
main_with_dataset(Arc::new(MemoryDataset::default()), matches)
}
}
fn main_with_dataset<D: SparqlDataset + Send + Sync + RefUnwindSafe + 'static>(
dataset: Arc<D>,
matches: ArgMatches,
) -> Result<(), failure::Error> {
if let Some(nt_file) = matches.value_of("ntriples") { if let Some(nt_file) = matches.value_of("ntriples") {
println!("Loading NTriples file {}", nt_file); println!("Loading NTriples file {}", nt_file);
let default_graph = dataset.default_graph(); let default_graph = dataset.default_graph();
@ -77,8 +96,8 @@ pub fn main() -> Result<(), failure::Error> {
Ok(()) Ok(())
} }
fn router(dataset: MemoryDataset) -> Router { fn router<D: SparqlDataset + Send + Sync + RefUnwindSafe + 'static>(dataset: Arc<D>) -> Router {
let store = SparqlStore::new(dataset); let store = SparqlStore(dataset);
let middleware = StateMiddleware::new(store); let middleware = StateMiddleware::new(store);
let pipeline = single_middleware(middleware); let pipeline = single_middleware(middleware);
let (chain, pipelines) = single_pipeline(pipeline); let (chain, pipelines) = single_pipeline(pipeline);
@ -90,7 +109,7 @@ fn router(dataset: MemoryDataset) -> Router {
.to(|mut state: State| -> (State, Response<Body>) { .to(|mut state: State| -> (State, Response<Body>) {
let parsed_request = QueryRequest::take_from(&mut state); let parsed_request = QueryRequest::take_from(&mut state);
let response = let response =
evaluate_sparql_query(&mut state, &parsed_request.query.as_bytes()); evaluate_sparql_query::<D>(&mut state, &parsed_request.query.as_bytes());
(state, response) (state, response)
}); });
assoc.post().to(|mut state: State| -> Box<HandlerFuture> { assoc.post().to(|mut state: State| -> Box<HandlerFuture> {
@ -105,13 +124,16 @@ fn router(dataset: MemoryDataset) -> Router {
{ {
Some(content_type) => { Some(content_type) => {
if content_type == "application/sparql-query" { if content_type == "application/sparql-query" {
evaluate_sparql_query(&mut state, &body.into_bytes()) evaluate_sparql_query::<D>(
&mut state,
&body.into_bytes(),
)
} else if content_type } else if content_type
== "application/x-www-form-urlencoded" == "application/x-www-form-urlencoded"
{ {
match parse_urlencoded_query_request(&body.into_bytes()) match parse_urlencoded_query_request(&body.into_bytes())
{ {
Ok(parsed_request) => evaluate_sparql_query( Ok(parsed_request) => evaluate_sparql_query::<D>(
&mut state, &mut state,
&parsed_request.query.as_bytes(), &parsed_request.query.as_bytes(),
), ),
@ -150,17 +172,17 @@ fn router(dataset: MemoryDataset) -> Router {
}) })
} }
#[derive(Clone, StateData)] #[derive(StateData)]
struct SparqlStore(Arc<MemoryDataset>); struct SparqlStore<D: SparqlDataset + Send + Sync + RefUnwindSafe + 'static>(Arc<D>);
impl SparqlStore { impl<D: SparqlDataset + Send + Sync + RefUnwindSafe + 'static> Clone for SparqlStore<D> {
fn new(dataset: MemoryDataset) -> Self { fn clone(&self) -> Self {
SparqlStore(Arc::new(dataset)) SparqlStore(self.0.clone())
} }
} }
impl AsRef<MemoryDataset> for SparqlStore { impl<D: SparqlDataset + Send + Sync + RefUnwindSafe + 'static> AsRef<D> for SparqlStore<D> {
fn as_ref(&self) -> &MemoryDataset { fn as_ref(&self) -> &D {
&*self.0 &*self.0
} }
} }
@ -178,8 +200,11 @@ fn parse_urlencoded_query_request(query: &[u8]) -> Result<QueryRequest, failure:
}).ok_or_else(|| format_err!("'query' parameter not found")) }).ok_or_else(|| format_err!("'query' parameter not found"))
} }
fn evaluate_sparql_query(state: &mut State, query: &[u8]) -> Response<Body> { fn evaluate_sparql_query<D: SparqlDataset + Send + Sync + RefUnwindSafe + 'static>(
let dataset = SparqlStore::take_from(state); state: &mut State,
query: &[u8],
) -> Response<Body> {
let dataset: SparqlStore<D> = SparqlStore::take_from(state);
match dataset.as_ref().prepare_query(query) { match dataset.as_ref().prepare_query(query) {
Ok(query) => match query.exec() { Ok(query) => match query.exec() {
Ok(QueryResult::Graph(triples)) => { Ok(QueryResult::Graph(triples)) => {
@ -216,7 +241,7 @@ mod tests {
#[test] #[test]
fn get_query() { fn get_query() {
let test_server = TestServer::new(router(MemoryDataset::default())).unwrap(); let test_server = TestServer::new(router(Arc::new(MemoryDataset::default()))).unwrap();
let response = test_server let response = test_server
.client() .client()
.get("http://localhost/query?query=SELECT+*+WHERE+{+?s+?p+?o+}") .get("http://localhost/query?query=SELECT+*+WHERE+{+?s+?p+?o+}")
@ -227,7 +252,7 @@ mod tests {
#[test] #[test]
fn post_query() { fn post_query() {
let test_server = TestServer::new(router(MemoryDataset::default())).unwrap(); let test_server = TestServer::new(router(Arc::new(MemoryDataset::default()))).unwrap();
let response = test_server let response = test_server
.client() .client()
.post( .post(

Loading…
Cancel
Save