diff --git a/server/src/main.rs b/server/src/main.rs index 8b0be8f4..05fcef7e 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -10,9 +10,9 @@ )] use clap::{App, Arg}; -use oxhttp::model::{HeaderName, HeaderValue, Request, Response, Status}; +use oxhttp::model::{Body, HeaderName, HeaderValue, Request, Response, Status}; use oxhttp::Server; -use oxigraph::io::{DatasetFormat, GraphFormat}; +use oxigraph::io::{DatasetFormat, DatasetSerializer, GraphFormat, GraphSerializer}; use oxigraph::model::{GraphName, GraphNameRef, IriParseError, NamedNode, NamedOrBlankNode}; use oxigraph::sparql::{Query, QueryResults, QueryResultsFormat, Update}; use oxigraph::store::Store; @@ -20,7 +20,7 @@ use oxiri::Iri; use rand::random; use std::cell::RefCell; use std::cmp::min; -use std::io::{BufReader, Read, Write}; +use std::io::{BufReader, Error, ErrorKind, Read, Write}; use std::rc::Rc; use std::str::FromStr; use std::time::Duration; @@ -152,9 +152,7 @@ fn handle_request(request: &mut Request, store: Store) -> Response { } } (path, "GET") if path.starts_with("/store") => { - //TODO: stream - let mut body = Vec::default(); - let format = if let Some(target) = match store_target(request) { + if let Some(target) = match store_target(request) { Ok(target) => target, Err(error) => return error, } { @@ -174,26 +172,54 @@ fn handle_request(request: &mut Request, store: Store) -> Response { Ok(format) => format, Err(response) => return response, }; - if let Err(e) = - store.dump_graph(&mut body, format, GraphName::from(target).as_ref()) - { - return internal_server_error(e); - } - format.media_type() + let triples = store.quads_for_pattern( + None, + None, + None, + Some(GraphName::from(target).as_ref()), + ); + ReadForWrite::build_response( + move |w| { + Ok(( + GraphSerializer::from_format(format).triple_writer(w)?, + triples, + )) + }, + |(mut writer, mut triples)| { + Ok(if let Some(t) = triples.next() { + writer.write(&t?.into())?; + Some((writer, triples)) + } else { + writer.finish()?; + None + }) + }, + format.media_type(), + ) } else { let format = match dataset_content_negotiation(request) { Ok(format) => format, Err(response) => return response, }; - if let Err(e) = store.dump_dataset(&mut body, format) { - return internal_server_error(e); - } - format.media_type() - }; - Response::builder(Status::OK) - .with_header(HeaderName::CONTENT_TYPE, format) - .unwrap() - .with_body(body) + ReadForWrite::build_response( + move |w| { + Ok(( + DatasetSerializer::from_format(format).quad_writer(w)?, + store.iter(), + )) + }, + |(mut writer, mut quads)| { + Ok(if let Some(q) = quads.next() { + writer.write(&q?)?; + Some((writer, quads)) + } else { + writer.finish()?; + None + }) + }, + format.media_type(), + ) + } } (path, "PUT") if path.starts_with("/store") => { if let Some(content_type) = content_type(request) { @@ -502,18 +528,32 @@ fn evaluate_sparql_query( Ok(results) => results, Err(e) => return internal_server_error(e), }; - //TODO: stream - let mut body = Vec::default(); - let media_type = if let QueryResults::Graph(_) = results { + if let QueryResults::Graph(triples) = results { let format = match graph_content_negotiation(request) { Ok(format) => format, Err(response) => return response, }; - if let Err(e) = results.write_graph(&mut body, format) { - return internal_server_error(e); - } - format.media_type() + ReadForWrite::build_response( + move |w| { + Ok(( + GraphSerializer::from_format(format).triple_writer(w)?, + triples, + )) + }, + |(mut writer, mut triples)| { + Ok(if let Some(t) = triples.next() { + writer.write(&t.map_err(|e| Error::new(ErrorKind::Other, e))?)?; + Some((writer, triples)) + } else { + writer.finish()?; + None + }) + }, + format.media_type(), + ) } else { + //TODO: stream + let mut body = Vec::default(); let format = match content_negotiation( request, &[ @@ -530,12 +570,12 @@ fn evaluate_sparql_query( if let Err(e) = results.write(&mut body, format) { return internal_server_error(e); } - format.media_type() - }; - Response::builder(Status::OK) - .with_header(HeaderName::CONTENT_TYPE, media_type) - .unwrap() - .with_body(body) + + Response::builder(Status::OK) + .with_header(HeaderName::CONTENT_TYPE, format.media_type()) + .unwrap() + .with_body(body) + } } fn configure_and_evaluate_sparql_update( @@ -810,31 +850,47 @@ fn internal_server_error(message: impl ToString) -> Response { } /// Hacky tool to allow implementing read on top of a write loop -struct ReadForWrite std::io::Result)> { +struct ReadForWrite std::io::Result>)> { buffer: Rc>>, position: usize, - write_operator: U, + add_more_data: U, + state: Option, } -impl std::io::Result)> ReadForWrite { - fn new(write_operator_builder: impl FnOnce(ReadForWriteWriter) -> U) -> Self { +impl std::io::Result>) + 'static> ReadForWrite { + fn build_response( + initial_state_builder: impl FnOnce(ReadForWriteWriter) -> std::io::Result, + add_more_data: U, + content_type: &'static str, + ) -> Response { let buffer = Rc::new(RefCell::new(Vec::new())); - Self { + match initial_state_builder(ReadForWriteWriter { buffer: buffer.clone(), - position: 0, - write_operator: write_operator_builder(ReadForWriteWriter { buffer }), + }) { + Ok(state) => Response::builder(Status::OK) + .with_header(HeaderName::CONTENT_TYPE, content_type) + .unwrap() + .with_body(Self { + buffer, + position: 0, + add_more_data, + state: Some(state), + }), + Err(e) => internal_server_error(e), } } } -impl std::io::Result)> Read for ReadForWrite { +impl std::io::Result>)> Read for ReadForWrite { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { while self.position == self.buffer.borrow().len() { // We read more data - self.buffer.as_ref().borrow_mut().clear(); - self.position = 0; - if (self.write_operator)()? == ReadForWriteState::End { - break; + if let Some(state) = self.state.take() { + self.buffer.as_ref().borrow_mut().clear(); + self.position = 0; + self.state = (self.add_more_data)(state)?; + } else { + return Ok(0); // End } } let buffer = self.buffer.borrow(); @@ -847,6 +903,14 @@ impl std::io::Result)> Read for ReadForWrite std::io::Result>) + 'static> From> + for Body +{ + fn from(body: ReadForWrite) -> Self { + Body::from_read(body) + } +} + struct ReadForWriteWriter { buffer: Rc>>, } @@ -861,12 +925,6 @@ impl Write for ReadForWriteWriter { } } -#[derive(Eq, PartialEq, Copy, Clone)] -enum ReadForWriteState { - Middle, - End, -} - #[cfg(test)] mod tests { use super::*;