child: Always print everything to our output

Also don't buffer the child's stdout and stderr.
master
Nick Fitzgerald 7 years ago
parent 2fa961d97d
commit 1621d18072
  1. 245
      src/child.rs

@ -1,88 +1,121 @@
//! Utilties for managing child processes. //! Utilties for managing child processes.
//! //!
//! This module helps us ensure that: //! This module helps us ensure that all child processes that we spawn get
//! //! properly logged and their output is logged as well.
//! * All child processes that we spawn get properly logged and their output is
//! logged as well.
//!
//! * That any "quick running" child process does not spam the console with its
//! output.
//!
//! * That any "long running" child process gets its output copied to our
//! stderr, so that the user isn't sitting there wondering if anything at all
//! is happening. This is important for showing `cargo build`'s output, for
//! example.
use error::Error; use error::Error;
use failure; use failure;
use slog::Logger; use slog::Logger;
use std::{ use std::{
io::{self, BufRead}, io::{self, Read},
process, mem, process, string,
sync::mpsc, sync::mpsc,
thread, time, thread,
}; };
use PBAR; use PBAR;
fn taking_too_long(since: time::Instant) -> bool { #[derive(Debug)]
since.elapsed() > time::Duration::from_millis(200) enum OutputFragment {
} Stdout(Vec<u8>),
Stderr(Vec<u8>),
enum Output<S: AsRef<str>> {
Stdout(S),
Stderr(S),
} }
fn print_child_output<S>(out: Result<Output<S>, io::Error>, command_name: &str) /// Read data from the give reader and send it as an `OutputFragment` over the
/// given sender.
fn read_and_send<R, F>(
mut reader: R,
sender: mpsc::Sender<OutputFragment>,
mut map: F,
) -> io::Result<()>
where where
S: AsRef<str>, R: Read,
F: FnMut(Vec<u8>) -> OutputFragment,
{ {
let message = match out { let mut buf = vec![0; 1024];
Ok(Output::Stdout(line)) => format!("{} (stdout): {}", command_name, line.as_ref()), loop {
Ok(Output::Stderr(line)) => format!("{} (stderr): {}", command_name, line.as_ref()), match reader.read(&mut buf) {
Err(e) => format!("error reading {} output: {}", command_name, e), Err(e) => {
}; if e.kind() == io::ErrorKind::Interrupted {
PBAR.message(&message); continue;
} else {
return Err(e);
}
}
Ok(0) => return Ok(()),
Ok(n) => {
buf.truncate(n);
let buf = mem::replace(&mut buf, vec![0; 1024]);
sender.send(map(buf)).unwrap();
}
}
}
} }
fn handle_output<I, S>( /// Accumulates output from a stream of output fragments and calls a callback on
output: I, /// each complete line as it is accumulating.
logger: &Logger, struct OutputAccumulator<F> {
should_print: bool, result: String,
stdout: &mut String, in_progress: Vec<u8>,
stderr: &mut String, on_each_line: F,
command_name: &str, }
) where
I: IntoIterator<Item = Result<Output<S>, io::Error>>, impl<F> OutputAccumulator<F>
S: AsRef<str>, where
F: FnMut(&str),
{ {
for out in output { /// Construct a new output accumulator with the given `on_each_line`
match out { /// callback.
Ok(Output::Stdout(ref line)) => { fn new(on_each_line: F) -> OutputAccumulator<F> {
let line = line.as_ref().trim_end(); OutputAccumulator {
info!(logger, "{} (stdout): {}", command_name, line); result: String::new(),
stdout.push_str(line); in_progress: Vec::new(),
} on_each_line,
Ok(Output::Stderr(ref line)) => { }
let line = line.as_ref().trim_end(); }
info!(logger, "{} (stderr): {}", command_name, line);
stderr.push_str(line); /// Add another fragment of output to the accumulation, calling the
} /// `on_each_line` callback for any complete lines we accumulate.
Err(ref e) => { fn push(&mut self, fragment: Vec<u8>) -> Result<(), string::FromUtf8Error> {
warn!(logger, "error reading output: {}", e); debug_assert!(!fragment.is_empty());
self.in_progress.extend(fragment);
if let Some((last_newline, _)) = self
.in_progress
.iter()
.cloned()
.enumerate()
.rev()
.find(|(_, ch)| *ch == b'\n')
{
let next_in_progress: Vec<u8> = self.in_progress[last_newline + 1..]
.iter()
.cloned()
.collect();
let mut these_lines = mem::replace(&mut self.in_progress, next_in_progress);
these_lines.truncate(last_newline + 1);
let these_lines = String::from_utf8(these_lines)?;
for line in these_lines.lines() {
(self.on_each_line)(line);
} }
self.result.push_str(&these_lines);
} }
if should_print {
print_child_output(out, command_name); Ok(())
}
/// Finish accumulation, run the `on_each_line` callback on the final line
/// (if any), and return the accumulated output.
fn finish(mut self) -> Result<String, string::FromUtf8Error> {
if !self.in_progress.is_empty() {
let last_line = String::from_utf8(self.in_progress)?;
(self.on_each_line)(&last_line);
self.result.push_str(&last_line);
} }
Ok(self.result)
} }
} }
/// Run the given command and return its stdout. /// Run the given command and return its stdout.
///
/// If the command takes "too long", then its stdout and stderr are also piped
/// to our stdout and stderr so that the user has an idea of what is going on
/// behind the scenes.
pub fn run( pub fn run(
logger: &Logger, logger: &Logger,
mut command: process::Command, mut command: process::Command,
@ -95,10 +128,8 @@ pub fn run(
.stderr(process::Stdio::piped()) .stderr(process::Stdio::piped())
.spawn()?; .spawn()?;
let since = time::Instant::now(); let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
let stdout = io::BufReader::new(child.stdout.take().unwrap());
let stderr = io::BufReader::new(child.stderr.take().unwrap());
let (send, recv) = mpsc::channel(); let (send, recv) = mpsc::channel();
let stdout_send = send.clone(); let stdout_send = send.clone();
@ -108,69 +139,39 @@ pub fn run(
// and stderr on a separate thread to avoid potential dead locks with // and stderr on a separate thread to avoid potential dead locks with
// waiting on the child process. // waiting on the child process.
let stdout_handle = thread::spawn(move || { let stdout_handle =
for line in stdout.lines() { thread::spawn(move || read_and_send(stdout, stdout_send, OutputFragment::Stdout));
stdout_send.send(line.map(Output::Stdout)).unwrap(); let stderr_handle =
} thread::spawn(move || read_and_send(stderr, stderr_send, OutputFragment::Stderr));
});
let stderr_handle = thread::spawn(move || { let mut stdout = OutputAccumulator::new(|line| {
for line in stderr.lines() { info!(logger, "{} (stdout): {}", command_name, line);
stderr_send.send(line.map(Output::Stderr)).unwrap(); PBAR.message(line)
} });
let mut stderr = OutputAccumulator::new(|line| {
info!(logger, "{} (stderr): {}", command_name, line);
PBAR.message(line)
}); });
let mut stdout = String::new(); for output in recv {
let mut stderr = String::new(); match output {
let mut is_long_running = false; OutputFragment::Stdout(line) => stdout.push(line)?,
OutputFragment::Stderr(line) => stderr.push(line)?,
};
}
loop { let stdout = stdout.finish()?;
if !is_long_running && taking_too_long(since) { let stderr = stderr.finish()?;
// The command has now been taking too long. Print the buffered stdout and
// stderr, and then continue waiting on the child.
stdout
.lines()
.map(|l| Ok(Output::Stdout(l)))
.chain(stderr.lines().map(|l| Ok(Output::Stderr(l))))
.for_each(|l| print_child_output(l, command_name));
is_long_running = true;
}
// Get any output that's been sent on the channel without blocking. // Join the threads reading the child's output to make sure the finish OK.
handle_output( stdout_handle.join().unwrap()?;
recv.try_iter(), stderr_handle.join().unwrap()?;
logger,
is_long_running,
&mut stdout,
&mut stderr,
command_name,
);
if let Some(exit) = child.try_wait()? {
// Block on collecting the rest of the child's output.
handle_output(
recv,
logger,
is_long_running,
&mut stdout,
&mut stderr,
command_name,
);
// Join the threads reading the child's output to make sure the
// finish OK.
stdout_handle.join().unwrap();
stderr_handle.join().unwrap();
if exit.success() {
return Ok(stdout);
} else {
let msg = format!("`{}` did not exit successfully", command_name);
return Err(Error::cli(&msg, stderr.into(), exit).into());
}
}
thread::yield_now(); let exit = child.wait()?;
if exit.success() {
return Ok(stdout);
} else {
let msg = format!("`{}` did not exit successfully", command_name);
return Err(Error::cli(&msg, stderr.into(), exit).into());
} }
} }

Loading…
Cancel
Save