【问题标题】:julia multi-threaded not scaling for embarrassingly parallel job朱莉娅多线程不能扩展令人尴尬的并行工作
【发布时间】:2020-11-19 18:21:48
【问题描述】:

以下代码计算从几组中获得 50 张独特牌的平均抽奖次数。重要的是,这个问题不需要太多 RAM,并且在多线程模式下启动时不共享任何变量。当使用四个以上的线程启动以执行 400,000 次模拟时,它始终比同时启动并执行 200,000 次模拟的两个进程多花费大约一秒钟的时间。这一直困扰着我,我找不到任何解释。

这是 epic_draw_multi_thread.jl 中的 Julia 代码:

using Random
using Printf
import Base.Threads.@spawn

function pickone(dist)
    n = length(dist)
    i = 1
    r = rand()
    while r >= dist[i] && i<n 
        i+=1
    end
    return i
end  

function init_items(type_dist, unique_elements)
    return zeros(Int32, length(type_dist), maximum(unique_elements))
end

function draw(type_dist, unique_elements_dist)
    item_type = pickone(type_dist)
    item_number = pickone(unique_elements_dist[item_type])
    return item_type, item_number
end

function draw_unique(type_dist, unique_elements_dist, items, x)
    while sum(items .> 0) < x
        item_type, item_number = draw(type_dist, unique_elements_dist)
        items[item_type, item_number] += 1
    end
    return sum(items)
end

function average_for_unique(type_dist, unique_elements_dist, x, n, reset=true)
    println(@sprintf("Started average_for_unique on thread %d with n = %d", Threads.threadid(), n))
    items = init_items(type_dist, unique_elements)

    tot_draws = 0
    for i in 1:n
        tot_draws += draw_unique(type_dist, unique_elements_dist, items, x)
        if reset
            items .= 0
        else
            items[items.>1] -= 1
        end
    end

    println(@sprintf("Completed average_for_unique on thread %d with n = %d", Threads.threadid(), n))
    return tot_draws / n
end

function parallel_average_for_unique(type_dist, unique_elements_dist, x, n, reset=true)
    println("Started computing...")
    t = max(Threads.nthreads() - 1, 1)
    m = Int32(round(n / t))
    tasks = Array{Task}(undef, t)
    @sync for i in 1:t
        task = @spawn average_for_unique(type_dist, unique_elements_dist, x, m)
        tasks[i] = task
    end
    sum(fetch(t) for t in tasks) / t
end
    
type_dist = [0.3, 0.3, 0.2, 0.15, 0.05]
const cum_type_dist = cumsum(type_dist)

unique_elements = [21, 27, 32, 14, 10]
unique_elements_dist = [[1 / unique_elements[j] for i in 1:unique_elements[j]] for j in 1:length(unique_elements)]
const cum_unique_elements_dist = [cumsum(dist) for dist in unique_elements_dist]

str_n = ARGS[1]
n = parse(Int64, str_n)
avg = parallel_average_for_unique(cum_type_dist, cum_unique_elements_dist, 50, n)
print(avg)

这是在 shell 发出的在两个线程上运行的命令以及输出和计时结果:

time julia --threads 3 epic_draw_multi_thread.jl 400000
Started computing...
Started average_for_unique on thread 3 with n = 200000
Started average_for_unique on thread 2 with n = 200000
Completed average_for_unique on thread 2 with n = 200000
Completed average_for_unique on thread 3 with n = 200000
70.44460749999999
real    0m14.347s
user    0m26.959s
sys     0m2.124s

这些是在 shell 发出的命令,用于运行两个进程,每个进程的一半作业大小以及输出和计时结果:

time julia --threads 1 epic_draw_multi_thread.jl 200000 &
time julia --threads 1 epic_draw_multi_thread.jl 200000 &
Started computing...
Started computing...
Started average_for_unique on thread 1 with n = 200000
Started average_for_unique on thread 1 with n = 200000
Completed average_for_unique on thread 1 with n = 200000
Completed average_for_unique on thread 1 with n = 200000
70.434375
real    0m12.919s
user    0m12.688s
sys     0m0.300s
70.448695
real    0m12.996s
user    0m12.790s
sys     0m0.308s

无论我重复多少次实验,我总是让多线程模式变慢。 备注:

  1. 我创建了并行代码来近似 PI 的值,但没有 遇到同样的问题。但是,我没有看到任何内容 可能导致线程之间的任何冲突导致速度缓慢的代码。
  2. 当开始使用多个线程时,我使用线程数减一来执行绘制。如果做不到这一点,最后一个线程似乎会挂起。此语句 t = max(Threads.nthreads() - 1, 1) 可以更改为 t = Threads.nthreads() 以使用确切的可用线程数。

编辑于 2020 年 11 月 20 日

实施了 Przemyslaw Szufel 建议。这是新代码:

using Random
using Printf
import Base.Threads.@spawn
using BenchmarkTools

function pickone(dist, mt)
    n = length(dist)
    i = 1
    r = rand(mt)
    while r >= dist[i] && i<n 
        i+=1
    end
    return i
end  

function init_items(type_dist, unique_elements)
    return zeros(Int32, length(type_dist), maximum(unique_elements))
end

function draw(type_dist, unique_elements_dist, mt)
    item_type = pickone(type_dist, mt)
    item_number = pickone(unique_elements_dist[item_type], mt)
    return item_type, item_number
end

function draw_unique(type_dist, unique_elements_dist, items, x, mt)
    while sum(items .> 0) < x
        item_type, item_number = draw(type_dist, unique_elements_dist, mt)
        items[item_type, item_number] += 1
    end
    return sum(items)
end

function average_for_unique(type_dist, unique_elements_dist, x, n, mt, reset=true)
    println(@sprintf("Started average_for_unique on thread %d with n = %d", Threads.threadid(), n))
    items = init_items(type_dist, unique_elements)

    tot_draws = 0
    for i in 1:n
        tot_draws += draw_unique(type_dist, unique_elements_dist, items, x, mt)
        if reset
            items .= 0
        else
            items[items.>1] -= 1
        end
    end

    println(@sprintf("Completed average_for_unique on thread %d with n = %d", Threads.threadid(), n))
    return tot_draws / n
end

function parallel_average_for_unique(type_dist, unique_elements_dist, x, n, reset=true)
    println("Started computing...")
    t = max(Threads.nthreads() - 1, 1)
    mts = MersenneTwister.(1:t)
    m = Int32(round(n / t))
    tasks = Array{Task}(undef, t)
    @sync for i in 1:t
        task = @spawn average_for_unique(type_dist, unique_elements_dist, x, m, mts[i])
        tasks[i] = task
    end
    sum(fetch(t) for t in tasks) / t
end
    
type_dist = [0.3, 0.3, 0.2, 0.15, 0.05]
const cum_type_dist = cumsum(type_dist)

unique_elements = [21, 27, 32, 14, 10]
unique_elements_dist = [[1 / unique_elements[j] for i in 1:unique_elements[j]] for j in 1:length(unique_elements)]
const cum_unique_elements_dist = [cumsum(dist) for dist in unique_elements_dist]

str_n = ARGS[1]
n = parse(Int64, str_n)
avg = @btime parallel_average_for_unique(cum_type_dist, cum_unique_elements_dist, 50, n)
print(avg)
    

更新的基准:

Threads          @btime     Linux Time       
1 (2 processes)  9.927 s    0m44.871s 
2 (1 process)   20.237 s    1m14.156s
3 (1 process)   14.302 s    1m2.114s

【问题讨论】:

  • 那是什么版本的 Julia?什么操作系统?我无法在 Linux 上使用 Julia v1.3、v1.4、v1.5、v1.6 中的任何一个来重现您的问题(多线程比单线程慢)。对我来说,多线程总是比单线程快,即使它没有完全扩展(使用 4 个 Julia 线程 -> 代码中的 3 个线程大约快 2.2 倍)
  • 我使用的是 1.5.2 版本
  • 代码对我来说有点像Fortranesque——很多事情都是手动完成的。例如,您知道 Distributions.jl 吗?
  • @phipsgabler 是的,我知道 Distributions.jl。但是,我想在满足条件之前绘制值,因此不知道我需要提前生成多少个随机数。有 n 副牌,每副牌包含不同数量的牌。我随机选择一个牌组,然后从该牌组中随机选择一张牌。我继续这样做,直到我得到 x 不同的卡片。关于如何做得更好的任何想法?不同牌组中的所有牌都不同,选择牌组或其他牌组的概率也不同。

标签: multithreading julia scaling


【解决方案1】:

这里有两个问题:

  1. 您没有正确衡量性能
  2. 在线程中生成随机数时,您应该为每个线程设置一个单独的 MersenneTwister 随机状态以获得最佳性能(否则您的随机状态将在所有线程之间共享并且需要进行同步)

目前您正在测量“Julia 开始时间”+“代码编译时间”+“运行时间”的时间。多线程代码的编译显然比单线程代码的编译时间要长。启动 Julia 本身也需要一两秒钟。

这里有两个选择。最简单的方法是使用BenchmarkTools@btime 宏来测量代码内部的执行时间。 另一种选择是将您的代码制作成一个包,并通过PackageCompiler 将其编译为 Julia 图像。但是,您仍将测量“Julia 开始时间”+“Julia 执行时间”

随机数状态可以创建为:

mts = MersenneTwister.(1:Threads.nthreads());

然后使用如rand(mts[Threads.threadid()])

【讨论】:

  • 感谢您的回答。代码完全相同,因此以相同的方式编译。我会看一下MersenneTwister,尽管正如注释中所说,我在近似 PI 时生成了许多随机数并且没有遇到同样的问题。最后,即使使用命令行中的近似测量方法,多线程的运行时间也始终更长。不过我会看看你的建议。
  • 在许多实际案例中,我看到一个单线程代码需要 1 秒来编译,而相同的多线程代码需要 5 秒。也许尝试在运行时间超过 30 秒的代码上运行基准测试?
  • 您对随机发生器的回答可能是正确的。根据discourse.julialang.org/t/multi-threading/22708,从不同线程调用时 rand() 的行为是未定义的。也许它已被修改为包括同步,这可以解释线程相互碰撞以获得随机数的额外时间。现在的问题是:如何有效地从不同线程生成随机数?
  • 它已更新为包括同步。但是,据我所知,如果共享一个状态,它们仍然会相互碰撞。 “如何有效地从不同线程生成随机数?” -> 它已经在我的帖子中了。
  • 您可能会遇到虚假分享 - 请参阅 *.com/questions/52593588/…
最近更新 更多