Server: Adds serve-read-only and serve-secondary commands

pull/409/head
Tpt 2 years ago committed by Thomas Tanon
parent b2385509a6
commit df55148355
  1. 145
      server/src/main.rs

@ -37,31 +37,58 @@ const LOGO: &str = include_str!("../logo.svg");
#[command(about, version)] #[command(about, version)]
/// Oxigraph SPARQL server. /// Oxigraph SPARQL server.
struct Args { struct Args {
/// Directory in which persist the data.
#[arg(short, long, global = true)]
location: Option<PathBuf>,
/// Open underlying database in secondary mode, specify path to store info logs (LOG)
// see https://github.com/facebook/rocksdb/wiki/Read-only-and-Secondary-instances
#[arg(short, long, global = true, conflicts_with = "readonly")]
secondary_location: Option<PathBuf>,
/// Open underlying database in read only mode
// see https://github.com/facebook/rocksdb/wiki/Read-only-and-Secondary-instances
#[arg(short, long, global = true, conflicts_with = "secondary_location")]
readonly: bool,
#[command(subcommand)] #[command(subcommand)]
command: Command, command: Command,
} }
#[derive(Subcommand)] #[derive(Subcommand)]
enum Command { enum Command {
/// Start Oxigraph HTTP server. /// Start Oxigraph HTTP server in read-write mode.
Serve { Serve {
/// Directory in which the data stored by Oxigraph should be persisted.
///
/// If not present. An in-memory storage will be used.
#[arg(short, long, global = true)]
location: Option<PathBuf>,
/// Host and port to listen to. /// Host and port to listen to.
#[arg(short, long, default_value = "localhost:7878", global = true)] #[arg(short, long, default_value = "localhost:7878", global = true)]
bind: String, bind: String,
}, },
/// Start Oxigraph HTTP server in read-only mode.
///
/// It allows to read the database while other processes are accessing it.
/// Changes done after this process has been launched will not be seen.
ServeReadOnly {
/// Directory in which the data stored by Oxigraph are persisted.
#[arg(short, long)]
location: PathBuf,
/// Host and port to listen to.
#[arg(short, long, default_value = "localhost:7878")]
bind: String,
},
/// Start Oxigraph HTTP server in secondary mode.
///
/// It allows to read the database while other processes are accessing it.
/// Changes done while this process is running will be replicated after a possible lag.
///
/// Beware: RocksDB secondary mode does not support snapshots and transactions.
/// Dirty reads might happen.
ServeSecondary {
/// Directory where the primary Oxigraph instance is writing to.
#[arg(long)]
primary_location: PathBuf,
/// Directory to which the current secondary instance might write to.
#[arg(long)]
secondary_location: PathBuf,
/// Host and port to listen to.
#[arg(short, long, default_value = "localhost:7878")]
bind: String,
},
/// Load file(s) into the store. /// Load file(s) into the store.
Load { Load {
/// Directory in which the loaded data should be persisted.
#[arg(short, long, global = true)]
location: Option<PathBuf>,
/// file(s) to load. /// file(s) to load.
/// ///
/// If multiple files are provided they are loaded in parallel. /// If multiple files are provided they are loaded in parallel.
@ -74,7 +101,7 @@ enum Command {
/// Can be an extension like "nt" or a MIME type like "application/n-triples". /// Can be an extension like "nt" or a MIME type like "application/n-triples".
/// ///
/// By default the format is guessed from the loaded file extension. /// By default the format is guessed from the loaded file extension.
#[arg(long, global = true, required_unless_present = "file")] #[arg(long, required_unless_present = "file")]
format: Option<String>, format: Option<String>,
/// Attempt to keep loading even if the data file is invalid. /// Attempt to keep loading even if the data file is invalid.
/// ///
@ -86,32 +113,49 @@ enum Command {
/// By default the default graph is used. /// By default the default graph is used.
/// ///
/// Only available when loading a graph file (N-Triples, Turtle...) and not a dataset file (N-Quads, TriG...). /// Only available when loading a graph file (N-Triples, Turtle...) and not a dataset file (N-Quads, TriG...).
#[arg(long, global = true)] #[arg(long)]
graph: Option<String>, graph: Option<String>,
}, },
} }
pub fn main() -> anyhow::Result<()> { pub fn main() -> anyhow::Result<()> {
let matches = Args::parse(); let matches = Args::parse();
let store = if let Some(path) = &matches.location {
if let Some(secondary_path) = &matches.secondary_location {
Store::open_secondary(path, secondary_path)
} else if matches.readonly {
Store::open_read_only(path)
} else {
Store::open(path)
}
} else {
Store::new()
}?;
match matches.command { match matches.command {
Command::Serve { location, bind } => serve(
if let Some(location) = location {
Store::open(&location)
} else {
Store::new()
}?,
bind,
true,
),
Command::ServeReadOnly { location, bind } => {
serve(Store::open_read_only(&location)?, bind, false)
}
Command::ServeSecondary {
primary_location,
secondary_location,
bind,
} => serve(
Store::open_secondary(&primary_location, &secondary_location)?,
bind,
false,
),
Command::Load { Command::Load {
file, file,
location,
lenient, lenient,
format, format,
graph, graph,
} => { } => {
let store = if let Some(location) = location {
Store::open(&location)
} else {
eprintln!("Warning: opening an in-memory store. It will not be possible to read the written data.");
Store::new()
}?;
let format = if let Some(format) = format { let format = if let Some(format) = format {
Some(GraphOrDatasetFormat::from_str(&format)?) Some(GraphOrDatasetFormat::from_str(&format)?)
} else { } else {
@ -229,17 +273,6 @@ pub fn main() -> anyhow::Result<()> {
Ok(()) Ok(())
} }
} }
Command::Serve { bind } => {
let mut server = Server::new(move |request| {
handle_request(request, store.clone())
.unwrap_or_else(|(status, message)| error(status, message))
});
server.set_global_timeout(HTTP_TIMEOUT);
server.set_server_name(concat!("Oxigraph/", env!("CARGO_PKG_VERSION")))?;
eprintln!("Listening for requests at http://{}", &bind);
server.listen(bind)?;
Ok(())
}
} }
} }
@ -332,9 +365,25 @@ impl FromStr for GraphOrDatasetFormat {
} }
} }
fn serve(store: Store, bind: String, allow_writes: bool) -> anyhow::Result<()> {
let mut server = Server::new(move |request| {
handle_request(request, store.clone(), allow_writes)
.unwrap_or_else(|(status, message)| error(status, message))
});
server.set_global_timeout(HTTP_TIMEOUT);
server.set_server_name(concat!("Oxigraph/", env!("CARGO_PKG_VERSION")))?;
eprintln!("Listening for requests at http://{}", &bind);
server.listen(bind)?;
Ok(())
}
type HttpError = (Status, String); type HttpError = (Status, String);
fn handle_request(request: &mut Request, store: Store) -> Result<Response, HttpError> { fn handle_request(
request: &mut Request,
store: Store,
allow_writes: bool,
) -> Result<Response, HttpError> {
match (request.url().path(), request.method().as_ref()) { match (request.url().path(), request.method().as_ref()) {
("/", "HEAD") => Ok(Response::builder(Status::OK) ("/", "HEAD") => Ok(Response::builder(Status::OK)
.with_header(HeaderName::CONTENT_TYPE, "text_html") .with_header(HeaderName::CONTENT_TYPE, "text_html")
@ -388,7 +437,7 @@ fn handle_request(request: &mut Request, store: Store) -> Result<Response, HttpE
Err(unsupported_media_type(&content_type)) Err(unsupported_media_type(&content_type))
} }
} }
("/update", "POST") => { ("/update", "POST") if allow_writes => {
let content_type = let content_type =
content_type(request).ok_or_else(|| bad_request("No Content-Type given"))?; content_type(request).ok_or_else(|| bad_request("No Content-Type given"))?;
if content_type == "application/sparql-update" { if content_type == "application/sparql-update" {
@ -471,7 +520,7 @@ fn handle_request(request: &mut Request, store: Store) -> Result<Response, HttpE
) )
} }
} }
(path, "PUT") if path.starts_with("/store") => { (path, "PUT") if path.starts_with("/store") && allow_writes => {
let content_type = let content_type =
content_type(request).ok_or_else(|| bad_request("No Content-Type given"))?; content_type(request).ok_or_else(|| bad_request("No Content-Type given"))?;
if let Some(target) = store_target(request)? { if let Some(target) = store_target(request)? {
@ -514,7 +563,7 @@ fn handle_request(request: &mut Request, store: Store) -> Result<Response, HttpE
Ok(Response::builder(Status::NO_CONTENT).build()) Ok(Response::builder(Status::NO_CONTENT).build())
} }
} }
(path, "DELETE") if path.starts_with("/store") => { (path, "DELETE") if path.starts_with("/store") && allow_writes => {
if let Some(target) = store_target(request)? { if let Some(target) = store_target(request)? {
match target { match target {
NamedGraphName::DefaultGraph => store NamedGraphName::DefaultGraph => store
@ -541,7 +590,7 @@ fn handle_request(request: &mut Request, store: Store) -> Result<Response, HttpE
} }
Ok(Response::builder(Status::NO_CONTENT).build()) Ok(Response::builder(Status::NO_CONTENT).build())
} }
(path, "POST") if path.starts_with("/store") => { (path, "POST") if path.starts_with("/store") && allow_writes => {
let content_type = let content_type =
content_type(request).ok_or_else(|| bad_request("No Content-Type given"))?; content_type(request).ok_or_else(|| bad_request("No Content-Type given"))?;
if let Some(target) = store_target(request)? { if let Some(target) = store_target(request)? {
@ -1227,7 +1276,7 @@ mod tests {
.assert() .assert()
.success() .success()
.stdout("") .stdout("")
.stderr(predicate::str::starts_with("1 triples loaded")); .stderr(predicate::str::contains("1 triples loaded"));
Ok(()) Ok(())
} }
@ -1242,7 +1291,7 @@ mod tests {
.assert() .assert()
.success() .success()
.stdout("") .stdout("")
.stderr(predicate::str::starts_with("1 triples loaded")); .stderr(predicate::str::contains("1 triples loaded"));
Ok(()) Ok(())
} }
@ -1260,7 +1309,7 @@ mod tests {
.assert() .assert()
.success() .success()
.stdout("") .stdout("")
.stderr(predicate::str::starts_with("1 triples loaded")); .stderr(predicate::str::contains("1 triples loaded"));
Ok(()) Ok(())
} }
@ -1279,7 +1328,7 @@ mod tests {
.assert() .assert()
.success() .success()
.stdout("") .stdout("")
.stderr(predicate::str::starts_with("1 triples loaded")); .stderr(predicate::str::contains("1 triples loaded"));
Ok(()) Ok(())
} }
@ -1293,7 +1342,7 @@ mod tests {
.assert() .assert()
.success() .success()
.stdout("") .stdout("")
.stderr(predicate::str::starts_with("1 triples loaded")); .stderr(predicate::str::contains("1 triples loaded"));
Ok(()) Ok(())
} }
@ -1861,7 +1910,7 @@ mod tests {
} }
fn exec(&self, mut request: Request) -> Response { fn exec(&self, mut request: Request) -> Response {
handle_request(&mut request, self.store.clone()) handle_request(&mut request, self.store.clone(), true)
.unwrap_or_else(|(status, message)| error(status, message)) .unwrap_or_else(|(status, message)| error(status, message))
} }

Loading…
Cancel
Save