【问题标题】:How to handle I/O of a subprocess asynchronously? [duplicate]如何异步处理子进程的 I/O? [复制]
【发布时间】:2019-04-22 16:36:53
【问题描述】:

我有一个子进程,它可能会或可能不会在特定时间内向其标准输出写入内容,例如3 秒。

如果子进程标准输出中的新行以正确的开头,我想返回该行。 理想情况下,我想实现这样的事情:

use std::io::{BufRead, BufReader};
use std::thread;
use std::time::Duration;

pub fn wait_for_or_exit(
    reader: &BufReader<&mut std::process::ChildStdout>,
    wait_time: u64,
    cmd: &str,
) -> Option<String> {
    let signal: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
    let signal_clone = signal.clone();
    let child = thread::spawn(move || {
        thread::sleep(Duration::from_millis(wait_time));
        signal_clone.store(true, Ordering::Relaxed);
    });
    let mut line = String::new();
    while !signal.load(Ordering::Relaxed) {
        //Sleep a really small amount of time not to block cpu
        thread::sleep(Duration::from_millis(10));
        //This line is obviously invalid!
        if reader.has_input() {
            line.clear();
            reader.read_line(&mut line).unwrap();
            if line.starts_with(cmd) {
                return Some(line);
            }
        }
    }
    None
}

这里唯一不起作用的是reader.has_input()

显然,如果子进程多次重复比wait_time 更快地响应,将会有很多休眠线程,但我可以通过通道来处理。

【问题讨论】:

    标签: asynchronous rust subprocess


    【解决方案1】:

    有两种方法。

    1. 您可以启动一个单独的线程,然后使用某种机制(可能是通道)向等待的线程发出成功或失败信号。
    2. 您可以使用前面提到的异步 IO,例如 futures 和 tokio lib。

    我将演示两者。我更喜欢futures/Tokio 方法,但如果您不熟悉futures 模型,那么选项一可能会更好。

    Rust 标准库有一个 Channels API,这个通道实际上有一个 recv_timeout,它可以帮助我们很多。

    use std::thread;
    use std::time::Duration;
    use std::sync::mpsc;
    
    // this spins up a separate thread in which to wait for stuff to read
    // from the BufReader<ChildStdout> 
    // If we successfully read, we send the string over the Channel.
    // Back in the original thread, we wait for an answer over the channel
    // or timeout in wait_time secs. 
    pub fn wait_for_or_exit(
        reader: &BufReader<&mut std::process::ChildStdout>,
        wait_time: u64,
        cmd: &str,
    ) -> Option<String> {
        let (sender, receiver) = mpsc::channel();
    
        thread::spawn(move || {
            let line = reader.read_line();
            sender.send(line);
        });
    
        match receiver.recv_timeout(Duration::from_secs(wait_time)) {
            Ok(line) => if line.starts_with(cmd) 
               { Some(line) } else 
               { None },
            Err(mpsc::RecvTimeoutError::Timeout) => None,
            Err(mpsc::RecvTimeoutError::Disconnected) => None  
    
        }
    }
    

    选项二假设您正在构建基于未来的应用程序。为了使用 Async IO 完成您想要的,一个文件描述符可以让我们设置NON_BLOCKING。幸运的是,我们不必自己这样做。 Futures 和 Tokio API 很好地处理了这个问题。权衡是您必须从非阻塞期货中编写代码。

    下面的代码几乎完全取自Tokio Process,带有来自 Tokio API 的 Futures 超时。

    extern crate futures;
    extern crate tokio;
    extern crate tokio_process;
    
    use std::process::Command;
    use std::time::{Duration};
    
    use futures::Future;
    use tokio_process::CommandExt;
    use tokio::prelude::*;
    
    const TIMEOUT_SECS: u64 = 3;
    
    fn main() {
        // Like above, but use `output_async` which returns a future instead of
        // immediately returning the `Child`.
        let output = Command::new("echo").arg("hello").arg("world")
                            .output_async();
    
        let future = output.map_err(|e| panic!("failed to collect output: {}", e))
            .map(|output| {
                assert!(output.status.success());
                assert_eq!(output.stdout, b"hello world\n");
                println!("received output: {}",     String::from_utf8(output.stdout).unwrap());
            })
            .timeout(Duration::from_secs(TIMEOUT_SECS)) // here is where we say we only want to wait TIMETOUT seconds
            .map_err(|_e| { println!("Timed out waiting for data"); });
    
        tokio::run(future);
    }
    

    【讨论】:

    • 我无法编译选项一。在thread::spawn 中,它抱怨读者需要静态生命周期。但是,我认为选项一无论如何都不可行,因为底层子进程将被杀死并重新启动数百次,并且不能保证它会回答。这将导致数百个侦听器线程,不是吗?我现在正在尝试选项二。
    猜你喜欢
    • 2016-01-06
    • 2015-09-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-01-07
    • 2013-02-21
    • 2013-06-19
    相关资源
    最近更新 更多