Oxigraph server: streams RDF triples/quads responses

pull/171/head
Tpt 3 years ago
parent 73f04b2baf
commit fe4c330d22
  1. 162
      server/src/main.rs

@ -10,9 +10,9 @@
)] )]
use clap::{App, Arg}; use clap::{App, Arg};
use oxhttp::model::{HeaderName, HeaderValue, Request, Response, Status}; use oxhttp::model::{Body, HeaderName, HeaderValue, Request, Response, Status};
use oxhttp::Server; use oxhttp::Server;
use oxigraph::io::{DatasetFormat, GraphFormat}; use oxigraph::io::{DatasetFormat, DatasetSerializer, GraphFormat, GraphSerializer};
use oxigraph::model::{GraphName, GraphNameRef, IriParseError, NamedNode, NamedOrBlankNode}; use oxigraph::model::{GraphName, GraphNameRef, IriParseError, NamedNode, NamedOrBlankNode};
use oxigraph::sparql::{Query, QueryResults, QueryResultsFormat, Update}; use oxigraph::sparql::{Query, QueryResults, QueryResultsFormat, Update};
use oxigraph::store::Store; use oxigraph::store::Store;
@ -20,7 +20,7 @@ use oxiri::Iri;
use rand::random; use rand::random;
use std::cell::RefCell; use std::cell::RefCell;
use std::cmp::min; use std::cmp::min;
use std::io::{BufReader, Read, Write}; use std::io::{BufReader, Error, ErrorKind, Read, Write};
use std::rc::Rc; use std::rc::Rc;
use std::str::FromStr; use std::str::FromStr;
use std::time::Duration; use std::time::Duration;
@ -152,9 +152,7 @@ fn handle_request(request: &mut Request, store: Store) -> Response {
} }
} }
(path, "GET") if path.starts_with("/store") => { (path, "GET") if path.starts_with("/store") => {
//TODO: stream if let Some(target) = match store_target(request) {
let mut body = Vec::default();
let format = if let Some(target) = match store_target(request) {
Ok(target) => target, Ok(target) => target,
Err(error) => return error, Err(error) => return error,
} { } {
@ -174,26 +172,54 @@ fn handle_request(request: &mut Request, store: Store) -> Response {
Ok(format) => format, Ok(format) => format,
Err(response) => return response, Err(response) => return response,
}; };
if let Err(e) = let triples = store.quads_for_pattern(
store.dump_graph(&mut body, format, GraphName::from(target).as_ref()) None,
{ None,
return internal_server_error(e); None,
} Some(GraphName::from(target).as_ref()),
format.media_type() );
ReadForWrite::build_response(
move |w| {
Ok((
GraphSerializer::from_format(format).triple_writer(w)?,
triples,
))
},
|(mut writer, mut triples)| {
Ok(if let Some(t) = triples.next() {
writer.write(&t?.into())?;
Some((writer, triples))
} else {
writer.finish()?;
None
})
},
format.media_type(),
)
} else { } else {
let format = match dataset_content_negotiation(request) { let format = match dataset_content_negotiation(request) {
Ok(format) => format, Ok(format) => format,
Err(response) => return response, Err(response) => return response,
}; };
if let Err(e) = store.dump_dataset(&mut body, format) { ReadForWrite::build_response(
return internal_server_error(e); move |w| {
} Ok((
format.media_type() DatasetSerializer::from_format(format).quad_writer(w)?,
}; store.iter(),
Response::builder(Status::OK) ))
.with_header(HeaderName::CONTENT_TYPE, format) },
.unwrap() |(mut writer, mut quads)| {
.with_body(body) Ok(if let Some(q) = quads.next() {
writer.write(&q?)?;
Some((writer, quads))
} else {
writer.finish()?;
None
})
},
format.media_type(),
)
}
} }
(path, "PUT") if path.starts_with("/store") => { (path, "PUT") if path.starts_with("/store") => {
if let Some(content_type) = content_type(request) { if let Some(content_type) = content_type(request) {
@ -502,18 +528,32 @@ fn evaluate_sparql_query(
Ok(results) => results, Ok(results) => results,
Err(e) => return internal_server_error(e), Err(e) => return internal_server_error(e),
}; };
//TODO: stream if let QueryResults::Graph(triples) = results {
let mut body = Vec::default();
let media_type = if let QueryResults::Graph(_) = results {
let format = match graph_content_negotiation(request) { let format = match graph_content_negotiation(request) {
Ok(format) => format, Ok(format) => format,
Err(response) => return response, Err(response) => return response,
}; };
if let Err(e) = results.write_graph(&mut body, format) { ReadForWrite::build_response(
return internal_server_error(e); move |w| {
} Ok((
format.media_type() GraphSerializer::from_format(format).triple_writer(w)?,
triples,
))
},
|(mut writer, mut triples)| {
Ok(if let Some(t) = triples.next() {
writer.write(&t.map_err(|e| Error::new(ErrorKind::Other, e))?)?;
Some((writer, triples))
} else {
writer.finish()?;
None
})
},
format.media_type(),
)
} else { } else {
//TODO: stream
let mut body = Vec::default();
let format = match content_negotiation( let format = match content_negotiation(
request, request,
&[ &[
@ -530,12 +570,12 @@ fn evaluate_sparql_query(
if let Err(e) = results.write(&mut body, format) { if let Err(e) = results.write(&mut body, format) {
return internal_server_error(e); return internal_server_error(e);
} }
format.media_type()
}; Response::builder(Status::OK)
Response::builder(Status::OK) .with_header(HeaderName::CONTENT_TYPE, format.media_type())
.with_header(HeaderName::CONTENT_TYPE, media_type) .unwrap()
.unwrap() .with_body(body)
.with_body(body) }
} }
fn configure_and_evaluate_sparql_update( fn configure_and_evaluate_sparql_update(
@ -810,31 +850,47 @@ fn internal_server_error(message: impl ToString) -> Response {
} }
/// Hacky tool to allow implementing read on top of a write loop /// Hacky tool to allow implementing read on top of a write loop
struct ReadForWrite<U: (FnMut() -> std::io::Result<ReadForWriteState>)> { struct ReadForWrite<O, U: (Fn(O) -> std::io::Result<Option<O>>)> {
buffer: Rc<RefCell<Vec<u8>>>, buffer: Rc<RefCell<Vec<u8>>>,
position: usize, position: usize,
write_operator: U, add_more_data: U,
state: Option<O>,
} }
impl<U: (FnMut() -> std::io::Result<ReadForWriteState>)> ReadForWrite<U> { impl<O: 'static, U: (Fn(O) -> std::io::Result<Option<O>>) + 'static> ReadForWrite<O, U> {
fn new(write_operator_builder: impl FnOnce(ReadForWriteWriter) -> U) -> Self { fn build_response(
initial_state_builder: impl FnOnce(ReadForWriteWriter) -> std::io::Result<O>,
add_more_data: U,
content_type: &'static str,
) -> Response {
let buffer = Rc::new(RefCell::new(Vec::new())); let buffer = Rc::new(RefCell::new(Vec::new()));
Self { match initial_state_builder(ReadForWriteWriter {
buffer: buffer.clone(), buffer: buffer.clone(),
position: 0, }) {
write_operator: write_operator_builder(ReadForWriteWriter { buffer }), Ok(state) => Response::builder(Status::OK)
.with_header(HeaderName::CONTENT_TYPE, content_type)
.unwrap()
.with_body(Self {
buffer,
position: 0,
add_more_data,
state: Some(state),
}),
Err(e) => internal_server_error(e),
} }
} }
} }
impl<U: (FnMut() -> std::io::Result<ReadForWriteState>)> Read for ReadForWrite<U> { impl<O, U: (Fn(O) -> std::io::Result<Option<O>>)> Read for ReadForWrite<O, U> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
while self.position == self.buffer.borrow().len() { while self.position == self.buffer.borrow().len() {
// We read more data // We read more data
self.buffer.as_ref().borrow_mut().clear(); if let Some(state) = self.state.take() {
self.position = 0; self.buffer.as_ref().borrow_mut().clear();
if (self.write_operator)()? == ReadForWriteState::End { self.position = 0;
break; self.state = (self.add_more_data)(state)?;
} else {
return Ok(0); // End
} }
} }
let buffer = self.buffer.borrow(); let buffer = self.buffer.borrow();
@ -847,6 +903,14 @@ impl<U: (FnMut() -> std::io::Result<ReadForWriteState>)> Read for ReadForWrite<U
} }
} }
impl<O: 'static, U: (Fn(O) -> std::io::Result<Option<O>>) + 'static> From<ReadForWrite<O, U>>
for Body
{
fn from(body: ReadForWrite<O, U>) -> Self {
Body::from_read(body)
}
}
struct ReadForWriteWriter { struct ReadForWriteWriter {
buffer: Rc<RefCell<Vec<u8>>>, buffer: Rc<RefCell<Vec<u8>>>,
} }
@ -861,12 +925,6 @@ impl Write for ReadForWriteWriter {
} }
} }
#[derive(Eq, PartialEq, Copy, Clone)]
enum ReadForWriteState {
Middle,
End,
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

Loading…
Cancel
Save