//! Utilties for managing child processes. //! //! This module helps us ensure that all child processes that we spawn get //! properly logged and their output is logged as well. use failure::Error; use log::info; use std::{ io::{self, Read}, mem, process::{Command, Stdio}, string, sync::mpsc, thread, }; use PBAR; #[derive(Debug)] enum OutputFragment { Stdout(Vec), Stderr(Vec), } /// Return a new Command object pub fn new_command(program: &str) -> Command { // On Windows, initializes launching as `cmd /c `. // Initializing only with `Command::new("npm")` will launch // `npm` with quotes, `"npm"`, causing a run-time error on Windows. // See rustc: #42436, #42791, #44542 if cfg!(windows) { let mut cmd = Command::new("cmd"); cmd.arg("/c").arg(program); cmd } else { Command::new(program) } } /// Read data from the give reader and send it as an `OutputFragment` over the /// given sender. fn read_and_send( mut reader: R, sender: mpsc::Sender, mut map: F, ) -> io::Result<()> where R: Read, F: FnMut(Vec) -> OutputFragment, { let mut buf = vec![0; 1024]; loop { match reader.read(&mut buf) { Err(e) => { if e.kind() == io::ErrorKind::Interrupted { continue; } else { return Err(e); } } Ok(0) => return Ok(()), Ok(n) => { buf.truncate(n); let buf = mem::replace(&mut buf, vec![0; 1024]); sender.send(map(buf)).unwrap(); } } } } /// Accumulates output from a stream of output fragments and calls a callback on /// each complete line as it is accumulating. struct OutputAccumulator { result: String, in_progress: Vec, on_each_line: F, } impl OutputAccumulator where F: FnMut(&str), { /// Construct a new output accumulator with the given `on_each_line` /// callback. fn new(on_each_line: F) -> OutputAccumulator { OutputAccumulator { result: String::new(), in_progress: Vec::new(), on_each_line, } } /// Add another fragment of output to the accumulation, calling the /// `on_each_line` callback for any complete lines we accumulate. fn push(&mut self, fragment: Vec) -> Result<(), string::FromUtf8Error> { debug_assert!(!fragment.is_empty()); self.in_progress.extend(fragment); if let Some((last_newline, _)) = self .in_progress .iter() .cloned() .enumerate() .rev() .find(|(_, ch)| *ch == b'\n') { let next_in_progress: Vec = self.in_progress[last_newline + 1..] .iter() .cloned() .collect(); let mut these_lines = mem::replace(&mut self.in_progress, next_in_progress); these_lines.truncate(last_newline + 1); let these_lines = String::from_utf8(these_lines)?; for line in these_lines.lines() { (self.on_each_line)(line); } self.result.push_str(&these_lines); } Ok(()) } /// Finish accumulation, run the `on_each_line` callback on the final line /// (if any), and return the accumulated output. fn finish(mut self) -> Result { if !self.in_progress.is_empty() { let last_line = String::from_utf8(self.in_progress)?; (self.on_each_line)(&last_line); self.result.push_str(&last_line); } Ok(self.result) } } /// Run the given command and return its stdout. pub fn run(mut command: Command, command_name: &str) -> Result { info!("Running {:?}", command); let mut child = command .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn()?; let stdout = child.stdout.take().unwrap(); let stderr = child.stderr.take().unwrap(); let (send, recv) = mpsc::channel(); let stdout_send = send.clone(); let stderr_send = send; // Because pipes have a fixed-size buffer, we need to keep reading stdout // and stderr on a separate thread to avoid potential dead locks with // waiting on the child process. let stdout_handle = thread::spawn(move || read_and_send(stdout, stdout_send, OutputFragment::Stdout)); let stderr_handle = thread::spawn(move || read_and_send(stderr, stderr_send, OutputFragment::Stderr)); let mut stdout = OutputAccumulator::new(|line| { info!("{} (stdout): {}", command_name, line); PBAR.message(line) }); let mut stderr = OutputAccumulator::new(|line| { info!("{} (stderr): {}", command_name, line); PBAR.message(line) }); for output in recv { match output { OutputFragment::Stdout(line) => stdout.push(line)?, OutputFragment::Stderr(line) => stderr.push(line)?, }; } let stdout = stdout.finish()?; let stderr = stderr.finish()?; // Join the threads reading the child's output to make sure the finish OK. stdout_handle.join().unwrap()?; stderr_handle.join().unwrap()?; let exit = child.wait()?; if exit.success() { return Ok(stdout); } else { drop((stdout, stderr)); bail!("failed to execute `{}`: exited with {}", command_name, exit) } }