From 4bbe193736da72aee3cafdc88c89e024ed83f365 Mon Sep 17 00:00:00 2001 From: Tpt Date: Wed, 14 Nov 2018 14:20:48 +0100 Subject: [PATCH] Allows rudf_server to be persistent with RocksDB --- server/Cargo.toml | 4 +-- server/src/main.rs | 61 ++++++++++++++++++++++++++++++++-------------- 2 files changed, 45 insertions(+), 20 deletions(-) diff --git a/server/Cargo.toml b/server/Cargo.toml index 194d10c3..48d8829b 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ Rudf based SPARQL server """ [dependencies] -rudf = {path = "../lib"} +rudf = {path = "../lib", features=["rocksdb"]} gotham = "0.3" gotham_derive = "0.3" hyper = "0.12" @@ -20,4 +20,4 @@ serde_derive = "1" mime = "0.3" failure = "0.1" url = "1" -clap = "2" \ No newline at end of file +clap = "2" diff --git a/server/src/main.rs b/server/src/main.rs index bbfb5298..68c3a93d 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -15,6 +15,7 @@ extern crate clap; use clap::App; use clap::Arg; +use clap::ArgMatches; use futures::future; use futures::Future; use futures::Stream; @@ -34,7 +35,6 @@ use hyper::Body; use hyper::HeaderMap; use hyper::Response; use hyper::StatusCode; -use rudf::model::Dataset; use rudf::model::Graph; use rudf::rio::ntriples::read_ntriples; use rudf::sparql::algebra::QueryResult; @@ -43,7 +43,9 @@ use rudf::sparql::PreparedQuery; use rudf::sparql::SparqlDataset; use rudf::store::MemoryDataset; use rudf::store::MemoryGraph; +use rudf::store::RocksDbDataset; use std::fs::File; +use std::panic::RefUnwindSafe; use std::sync::Arc; use url::form_urlencoded; @@ -60,9 +62,26 @@ pub fn main() -> Result<(), failure::Error> { .long("ntriples") .help("Load a N-Triples file in the server at startup") .takes_value(true), + ).arg( + Arg::with_name("file") + .long("file") + .short("f") + .help("File in which persist the dataset") + .takes_value(true), ).get_matches(); - let dataset = MemoryDataset::default(); + let file = matches.value_of("file").map(|v| v.to_string()); + if let Some(file) = file { + main_with_dataset(Arc::new(RocksDbDataset::open(file)?), matches) + } else { + main_with_dataset(Arc::new(MemoryDataset::default()), matches) + } +} + +fn main_with_dataset( + dataset: Arc, + matches: ArgMatches, +) -> Result<(), failure::Error> { if let Some(nt_file) = matches.value_of("ntriples") { println!("Loading NTriples file {}", nt_file); let default_graph = dataset.default_graph(); @@ -77,8 +96,8 @@ pub fn main() -> Result<(), failure::Error> { Ok(()) } -fn router(dataset: MemoryDataset) -> Router { - let store = SparqlStore::new(dataset); +fn router(dataset: Arc) -> Router { + let store = SparqlStore(dataset); let middleware = StateMiddleware::new(store); let pipeline = single_middleware(middleware); let (chain, pipelines) = single_pipeline(pipeline); @@ -90,7 +109,7 @@ fn router(dataset: MemoryDataset) -> Router { .to(|mut state: State| -> (State, Response) { let parsed_request = QueryRequest::take_from(&mut state); let response = - evaluate_sparql_query(&mut state, &parsed_request.query.as_bytes()); + evaluate_sparql_query::(&mut state, &parsed_request.query.as_bytes()); (state, response) }); assoc.post().to(|mut state: State| -> Box { @@ -105,13 +124,16 @@ fn router(dataset: MemoryDataset) -> Router { { Some(content_type) => { if content_type == "application/sparql-query" { - evaluate_sparql_query(&mut state, &body.into_bytes()) + evaluate_sparql_query::( + &mut state, + &body.into_bytes(), + ) } else if content_type == "application/x-www-form-urlencoded" { match parse_urlencoded_query_request(&body.into_bytes()) { - Ok(parsed_request) => evaluate_sparql_query( + Ok(parsed_request) => evaluate_sparql_query::( &mut state, &parsed_request.query.as_bytes(), ), @@ -150,17 +172,17 @@ fn router(dataset: MemoryDataset) -> Router { }) } -#[derive(Clone, StateData)] -struct SparqlStore(Arc); +#[derive(StateData)] +struct SparqlStore(Arc); -impl SparqlStore { - fn new(dataset: MemoryDataset) -> Self { - SparqlStore(Arc::new(dataset)) +impl Clone for SparqlStore { + fn clone(&self) -> Self { + SparqlStore(self.0.clone()) } } -impl AsRef for SparqlStore { - fn as_ref(&self) -> &MemoryDataset { +impl AsRef for SparqlStore { + fn as_ref(&self) -> &D { &*self.0 } } @@ -178,8 +200,11 @@ fn parse_urlencoded_query_request(query: &[u8]) -> Result Response { - let dataset = SparqlStore::take_from(state); +fn evaluate_sparql_query( + state: &mut State, + query: &[u8], +) -> Response { + let dataset: SparqlStore = SparqlStore::take_from(state); match dataset.as_ref().prepare_query(query) { Ok(query) => match query.exec() { Ok(QueryResult::Graph(triples)) => { @@ -216,7 +241,7 @@ mod tests { #[test] fn get_query() { - let test_server = TestServer::new(router(MemoryDataset::default())).unwrap(); + let test_server = TestServer::new(router(Arc::new(MemoryDataset::default()))).unwrap(); let response = test_server .client() .get("http://localhost/query?query=SELECT+*+WHERE+{+?s+?p+?o+}") @@ -227,7 +252,7 @@ mod tests { #[test] fn post_query() { - let test_server = TestServer::new(router(MemoryDataset::default())).unwrap(); + let test_server = TestServer::new(router(Arc::new(MemoryDataset::default()))).unwrap(); let response = test_server .client() .post(