【问题标题】:Why does spawning threads using Iterator::map not run the threads in parallel?为什么使用 Iterator::map 生成线程不能并行运行线程?
【发布时间】:2019-04-03 08:50:09
【问题描述】:

我用 Rust 编写了一个简单的多线程应用程序来将数字从 1 加到 x。 (我知道有一个公式,但重点是用 Rust 编写一些多线程代码,而不是得到结果。) 它工作得很好,但是在我将它重构为更具功能性的风格而不是命令式之后,多线程并没有更多的加速。检查 CPU 使用率时,我的 4 核 / 8 线程 CPU 似乎只使用了一个内核。原代码 CPU 占用率为 790%,重构后的 CPU 占用率仅为 99%。

原代码:

use std::thread;

fn main() {
    let mut handles: Vec<thread::JoinHandle<u64>> = Vec::with_capacity(8);

    const thread_count: u64 = 8;
    const batch_size: u64 = 20000000;

    for thread_id in 0..thread_count {
        handles.push(thread::spawn(move || {
            let mut sum = 0_u64;

            for i in thread_id * batch_size + 1_u64..(thread_id + 1) * batch_size + 1_u64 {
                sum += i;
            }

            sum
        }));
    }

    let mut total_sum = 0_u64;

    for handle in handles.into_iter() {
        total_sum += handle.join().unwrap();
    }
    println!("{}", total_sum);
}

重构后的代码:

use std::thread;

fn main() {
    const THREAD_COUNT: u64 = 8;
    const BATCH_SIZE: u64 = 20000000;

    // spawn threads that calculate a part of the sum
    let handles = (0..THREAD_COUNT).map(|thread_id| {
        thread::spawn(move ||
            // calculate the sum of all numbers from assigned to this thread
            (thread_id * BATCH_SIZE + 1 .. (thread_id + 1) * BATCH_SIZE + 1)
                .fold(0_u64,|sum, number| sum + number))
    });

    // add the parts of the sum together to get the total sum
    let sum = handles.fold(0_u64, |sum, handle| sum + handle.join().unwrap());

    println!("{}", sum);
}

程序的输出是相同的(12800000080000000),但重构后的版本慢了 5-6 倍。

似乎迭代器是惰性求值的。如何强制评估整个迭代器?我试图将它收集到 [thread::JoinHandle&lt;u64&gt;; THREAD_COUNT as usize] 类型的数组中,但随后我收到以下错误:

  --> src/main.rs:14:7
   |
14 |     ).collect::<[thread::JoinHandle<u64>; THREAD_COUNT as usize]>();
   |       ^^^^^^^ a collection of type `[std::thread::JoinHandle<u64>; 8]` cannot be built from `std::iter::Iterator<Item=std::thread::JoinHandle<u64>>`
   |
   = help: the trait `std::iter::FromIterator<std::thread::JoinHandle<u64>>` is not implemented for `[std::thread::JoinHandle<u64>; 8]`

收集到向量中确实有效,但这似乎是一个奇怪的解决方案,因为大小是预先知道的。有没有比使用矢量更好的方法?

【问题讨论】:

  • How do I collect into an array? — TL;DR 你不能轻易做到。使用ArrayVec 或者只使用Vec,因为迭代器的长度是已知的,所以无论如何它只会进行一次分配。

标签: multithreading functional-programming rust


【解决方案1】:

Rust 中的迭代器是惰性的,因此在 handles.fold 尝试访问迭代器的相应元素之前,您的线程不会启动。基本上会发生什么:

  1. handles.fold 尝试访问迭代器的第一个元素。
  2. 第一个线程已启动。
  3. handles.fold 调用它的闭包,它为第一个线程调用 handle.join()
  4. handle.join 等待第一个线程完成。
  5. handles.fold 尝试访问迭代器的第二个元素。
  6. 第二个线程已启动。
  7. 等等。

您应该在折叠结果之前将句柄收集到一个向量中:

let handles: Vec<_> = (0..THREAD_COUNT)
    .map(|thread_id| {
        thread::spawn(move ||
            // calculate the sum of all numbers from assigned to this thread
            (thread_id * BATCH_SIZE + 1 .. (thread_id + 1) * BATCH_SIZE + 1)
                .fold(0_u64,|sum, number| sum + number))
    })
    .collect();

或者你可以使用像Rayon 这样提供并行迭代器的板条箱。

【讨论】:

  • 啊,谢谢。这正是我的想法。 (当你发布这个时,我正在写编辑。)我想我会看看人造丝。
猜你喜欢
  • 2014-06-17
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-06-07
  • 2017-10-10
  • 1970-01-01
相关资源
最近更新 更多