【问题标题】:Is there an API to race N threads (or N closures on N threads) to completion?是否有一个 API 可以让 N 个线程(或 N 个线程上的 N 个闭包)竞赛完成?
【发布时间】:2018-09-25 12:41:33
【问题描述】:

给定几个以Output 值完成的线程,我如何获得第一个产生的Output?理想情况下,仍然能够在稍后按照它们的生成顺序获取剩余的Outputs,并记住某些线程可能会或可能不会终止。

例子:

struct Output(i32);

fn main() {
    let mut spawned_threads = Vec::new();

    for i in 0..10 {
        let join_handle: ::std::thread::JoinHandle<Output> = ::std::thread::spawn(move || {
            // pretend to do some work that takes some amount of time
            ::std::thread::sleep(::std::time::Duration::from_millis(
                (1000 - (100 * i)) as u64,
            ));
            Output(i) // then pretend to return the `Output` of that work
        });
        spawned_threads.push(join_handle);
    }

    // I can do this to wait for each thread to finish and collect all `Output`s
    let outputs_in_order_of_thread_spawning = spawned_threads
        .into_iter()
        .map(::std::thread::JoinHandle::join)
        .collect::<Vec<::std::thread::Result<Output>>>();

    // but how would I get the `Output`s in order of completed threads?
}

我可以使用共享队列/通道/类似的方法自己解决问题,但是是否有内置 API 或现有库可以更优雅地为我解决这个用例?

我正在寻找这样的 API:

fn race_threads<A: Send>(
    threads: Vec<::std::thread::JoinHandle<A>>
) -> (::std::thread::Result<A>, Vec<::std::thread::JoinHandle<A>>) {
    unimplemented!("so far this doesn't seem to exist")
}

Rayon's join 是我能找到的最接近的,但是 a)它只运行 2 个闭包而不是任意数量的闭包,并且 b)线程池 w/ 工作窃取方法不会对我的用例来说,有一些可能会永远运行的闭包。)

可以使用来自 How to check if a thread has finished in Rust? 的指针来解决这个用例,就像可以使用 MPSC 通道来解决这个用例一样,但是在这里我需要一个干净的 API 来竞争 n 线程(或失败) , n n 线程上的闭包)。

【问题讨论】:

  • 你可以只通过 mpsc 发送者到达线程并在接收端只消耗一个结果。
  • 我相信How to check if a thread has finished in Rust?已经回答了您的问题。如果您不同意,请edit您的问题解释这些答案如何没有回答这个问题。
  • @the8472 是的,这就是我所说的“使用共享队列/通道/类似”的意思——我正在寻求一种比这更简洁的方法。
  • @Shepmaster 我已经编辑了这个问题,以澄清我正在寻求现有的收集结果的解决方案;我认为使用您链接的 Q,我仍然需要自己滚动“在队列中收集结果”(尽管感谢您的指针!)。
  • 关于惯用 Rust 的旁注:你真的应该导入一些类型。拥有这么多完全合格的东西真是丑陋。

标签: multithreading concurrency rust


【解决方案1】:

没有,没有这样的 API。

您已经看到了多种解决问题的方法:

有时在编程时,您必须超越将预制块粘在一起。这应该是编程的一个乐趣部分。我鼓励你接受它。使用可用的组件创建您理想的 API 并将其发布到 crates.io


我真的不明白频道版本有什么可怕的:

use std::{sync::mpsc, thread, time::Duration};

#[derive(Debug)]
struct Output(i32);

fn main() {
    let (tx, rx) = mpsc::channel();

    for i in 0..10 {
        let tx = tx.clone();
        thread::spawn(move || {
            thread::sleep(Duration::from_millis((1000 - (100 * i)) as u64));
            tx.send(Output(i)).unwrap();
        });
    }
    // Don't hold on to the sender ourselves
    // Otherwise the loop would never terminate
    drop(tx);

    for r in rx {
        println!("{:?}", r);
    }
}

【讨论】:

  • 不只是把积木粘在一起 - 尊重,没有必要对我说话:)我是带着这个想法问的,如果它不存在,我可以这样写板条箱,或者如果其他人稍后写它,那么我想通过这个问题轻松找到它。但如果它已经存在,那么我宁愿不重新发明轮子。
  • 回答你隐含的问题,没什么可怕的。但是要说明为什么我宁愿重用现有解决方案而不是自己推出自己的解决方案,您的 sn-p 不会处理衍生线程中的恐慌,所以也许 Output 需要是 Result&lt;Output, String&gt;,但是如果 @987654329 @ 这样捕获恐慌不起作用,也许设置一个恐慌钩子,除此之外我需要保留任何现有的恐慌钩子..也许改为使用共享的Arc 减少的引用计数来检测恐慌,嗯 - 等等. 一切都可以解决,但重用一个好的解决方案更容易。毕竟,我也不会重新发明lazy_static
【解决方案2】:

这些问题可以通过使用condition variable来解决:

use std::sync::{Arc, Condvar, Mutex};

#[derive(Debug)]
struct Output(i32);

enum State {
    Starting,
    Joinable,
    Joined,
}

fn main() {
    let pair = Arc::new((Mutex::new(Vec::new()), Condvar::new()));
    let mut spawned_threads = Vec::new();

    let &(ref lock, ref cvar) = &*pair;
    for i in 0..10 {
        let my_pair = pair.clone();
        let join_handle: ::std::thread::JoinHandle<Output> = ::std::thread::spawn(move || {
            // pretend to do some work that takes some amount of time
            ::std::thread::sleep(::std::time::Duration::from_millis(
                (1000 - (100 * i)) as u64,
            ));

            let &(ref lock, ref cvar) = &*my_pair;
            let mut joinable = lock.lock().unwrap();
            joinable[i] = State::Joinable;
            cvar.notify_one();
            Output(i as i32) // then pretend to return the `Output` of that work
        });
        lock.lock().unwrap().push(State::Starting);
        spawned_threads.push(Some(join_handle));
    }

    let mut should_stop = false;
    while !should_stop {
        let locked = lock.lock().unwrap();
        let mut locked = cvar.wait(locked).unwrap();

        should_stop = true;
        for (i, state) in locked.iter_mut().enumerate() {
            match *state {
                State::Starting => {
                    should_stop = false;
                }
                State::Joinable => {
                    *state = State::Joined;
                    println!("{:?}", spawned_threads[i].take().unwrap().join());
                }
                State::Joined => (),
            }
        }
    }
}

(playground link)

我并不是说这是最简单的方法。每次子线程完成时,条件变量都会唤醒主线程。该列表可以显示每个线程的状态,如果一个(即将)完成,则可以加入。

【讨论】:

  • 这无论如何都不能保证输出将按顺序显示。设置变量和读取变量之间存在竞争条件:当主线程动作时,多个线程可能“准备好”,它永远不知道哪个是第一个。
  • 无论如何,“第一次”线程整理的概念并没有真正的意义。这将大致按照线程完成的顺序加入线程,大约在同一时间完成的线程按创建顺序处理,这在大多数情况下就足够了。例如,这就是 C# 中 WaitAny 方法所保证的内容:“如果在调用期间发出多个对象,则返回值是所有已发出对象中索引值最小的已发出对象的数组索引。”
  • 完全同意。但是,我认为应该在答案本身中提到这个(固有的)限制。例如,一个 MPSC 队列来传达线程的索引(从线程发布)会更精确......并将竞争转移给首先锁定队列的人。
  • 使用Condvar 比使用mpsc::channel 有什么优势吗?我很欣赏这个方向的指针(特别是因为 Condvar 的 API 有点棘手),但它似乎比基于通道的解决方案有更多的移动部分,而且我还没有发现任何好处。
猜你喜欢
  • 1970-01-01
  • 2021-06-03
  • 2018-03-11
  • 1970-01-01
  • 2021-06-07
  • 2021-12-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多