Server: returns query solutions in streaming

Fixes streaming HTTP body writer
pull/190/head
Tpt 3 years ago
parent b63148007a
commit 8ddd80d48b
  1. 1
      Cargo.lock
  2. 1
      server/Cargo.toml
  3. 137
      server/src/main.rs

1
Cargo.lock generated

@ -805,6 +805,7 @@ dependencies = [
"oxigraph", "oxigraph",
"oxiri", "oxiri",
"rand", "rand",
"sparesults",
"url", "url",
] ]

@ -15,6 +15,7 @@ edition = "2021"
oxhttp = "0.1" oxhttp = "0.1"
clap = { version = "3", features = ["derive"] } clap = { version = "3", features = ["derive"] }
oxigraph = { version = "0.3.0-dev", path = "../lib", features = ["http_client"] } oxigraph = { version = "0.3.0-dev", path = "../lib", features = ["http_client"] }
sparesults = { version = "0.1", path="../lib/sparesults", features = ["rdf-star"] }
rand = "0.8" rand = "0.8"
url = "2" url = "2"
oxiri = "0.2" oxiri = "0.2"

@ -14,10 +14,11 @@ use oxhttp::model::{Body, HeaderName, HeaderValue, Request, Response, Status};
use oxhttp::Server; use oxhttp::Server;
use oxigraph::io::{DatasetFormat, DatasetSerializer, GraphFormat, GraphSerializer}; use oxigraph::io::{DatasetFormat, DatasetSerializer, GraphFormat, GraphSerializer};
use oxigraph::model::{GraphName, GraphNameRef, IriParseError, NamedNode, NamedOrBlankNode}; use oxigraph::model::{GraphName, GraphNameRef, IriParseError, NamedNode, NamedOrBlankNode};
use oxigraph::sparql::{Query, QueryResults, QueryResultsFormat, Update}; use oxigraph::sparql::{Query, QueryResults, Update};
use oxigraph::store::Store; use oxigraph::store::Store;
use oxiri::Iri; use oxiri::Iri;
use rand::random; use rand::random;
use sparesults::{QueryResultsFormat, QueryResultsSerializer};
use std::cell::RefCell; use std::cell::RefCell;
use std::cmp::min; use std::cmp::min;
use std::fs::File; use std::fs::File;
@ -573,53 +574,72 @@ fn evaluate_sparql_query(
Ok(results) => results, Ok(results) => results,
Err(e) => return internal_server_error(e), Err(e) => return internal_server_error(e),
}; };
if let QueryResults::Graph(triples) = results { match results {
let format = match graph_content_negotiation(request) { QueryResults::Solutions(solutions) => {
Ok(format) => format, let format = match query_results_content_negotiation(request) {
Err(response) => return response, Ok(format) => format,
}; Err(response) => return response,
ReadForWrite::build_response( };
move |w| { ReadForWrite::build_response(
Ok(( move |w| {
GraphSerializer::from_format(format).triple_writer(w)?, Ok((
triples, QueryResultsSerializer::from_format(format)
)) .solutions_writer(w, solutions.variables().to_vec())?,
}, solutions,
|(mut writer, mut triples)| { ))
Ok(if let Some(t) = triples.next() { },
writer.write(&t?)?; |(mut writer, mut solutions)| {
Some((writer, triples)) Ok(if let Some(solution) = solutions.next() {
} else { writer.write(&solution?)?;
writer.finish()?; Some((writer, solutions))
None } else {
}) writer.finish()?;
}, None
format.media_type(), })
) },
} else { format.media_type(),
//TODO: stream )
let mut body = Vec::default(); }
let format = match content_negotiation( QueryResults::Boolean(result) => {
request, let format = match query_results_content_negotiation(request) {
&[ Ok(format) => format,
QueryResultsFormat::Xml.media_type(), Err(response) => return response,
QueryResultsFormat::Json.media_type(), };
QueryResultsFormat::Csv.media_type(), let mut body = Vec::new();
QueryResultsFormat::Tsv.media_type(), if let Err(e) =
], QueryResultsSerializer::from_format(format).write_boolean_result(&mut body, result)
QueryResultsFormat::from_media_type, {
) { return internal_server_error(e);
Ok(format) => format, }
Err(response) => return response, Response::builder(Status::OK)
}; .with_header(HeaderName::CONTENT_TYPE, format.media_type())
if let Err(e) = results.write(&mut body, format) { .unwrap()
return internal_server_error(e); .with_body(body)
}
QueryResults::Graph(triples) => {
let format = match graph_content_negotiation(request) {
Ok(format) => format,
Err(response) => return response,
};
ReadForWrite::build_response(
move |w| {
Ok((
GraphSerializer::from_format(format).triple_writer(w)?,
triples,
))
},
|(mut writer, mut triples)| {
Ok(if let Some(t) = triples.next() {
writer.write(&t?)?;
Some((writer, triples))
} else {
writer.finish()?;
None
})
},
format.media_type(),
)
} }
Response::builder(Status::OK)
.with_header(HeaderName::CONTENT_TYPE, format.media_type())
.unwrap()
.with_body(body)
} }
} }
@ -802,6 +822,19 @@ fn dataset_content_negotiation(request: &Request) -> Result<DatasetFormat, Respo
) )
} }
fn query_results_content_negotiation(request: &Request) -> Result<QueryResultsFormat, Response> {
content_negotiation(
request,
&[
QueryResultsFormat::Json.media_type(),
QueryResultsFormat::Xml.media_type(),
QueryResultsFormat::Csv.media_type(),
QueryResultsFormat::Tsv.media_type(),
],
QueryResultsFormat::from_media_type,
)
}
fn content_negotiation<F>( fn content_negotiation<F>(
request: &Request, request: &Request,
supported: &[&str], supported: &[&str],
@ -931,7 +964,7 @@ impl<O, U: (Fn(O) -> std::io::Result<Option<O>>)> Read for ReadForWrite<O, U> {
while self.position == self.buffer.borrow().len() { while self.position == self.buffer.borrow().len() {
// We read more data // We read more data
if let Some(state) = self.state.take() { if let Some(state) = self.state.take() {
self.buffer.as_ref().borrow_mut().clear(); self.buffer.borrow_mut().clear();
self.position = 0; self.position = 0;
self.state = (self.add_more_data)(state)?; self.state = (self.add_more_data)(state)?;
} else { } else {
@ -939,11 +972,9 @@ impl<O, U: (Fn(O) -> std::io::Result<Option<O>>)> Read for ReadForWrite<O, U> {
} }
} }
let buffer = self.buffer.borrow(); let buffer = self.buffer.borrow();
let start = self.position; let len = min(buffer.len() - self.position, buf.len());
let end = min(buffer.len() - self.position, buf.len()); buf[..len].copy_from_slice(&buffer[self.position..self.position + len]);
let len = end - start; self.position += len;
buf[..len].copy_from_slice(&buffer[start..end]);
self.position = end;
Ok(len) Ok(len)
} }
} }

Loading…
Cancel
Save