【问题标题】:Multi-threaded evaluations over a Channel in julia朱莉娅通道上的多线程评估
【发布时间】:2021-05-04 08:29:52
【问题描述】:

我想同时使用Channel 来生成必须评估函数的值,并使用Threads.@threads 并行化该函数的评估。 在没有任何多线程的情况下,我的 MWE 是:

#####
# Produces input values
function producer(c::Channel)
    nb_inputs = rand(1:10)
    for n=1:nb_inputs
        println("Generating | ",n)
        put!(c,n)
    end
end
#####
# Expensive function
function f(n::Int64)
    println("Running | ",n)
    sleep(2.0)
end
#####
# Evaluates the function over all the input values
function test()
    for n in Channel(producer)
        f(n)
    end
end
#####
test()

这里,producer(c) 返回输入值; f(n) 是一个昂贵的函数(其评估都是相互独立的); test() 根据生产者返回的所有输入值评估函数。 现在,我想在多个线程上并行运行test() 中的for 循环。 天真地,我试过了

Threads.@threads for n in Channel(producer)

然而,这失败了,因为 Channel 的元素数量最初是未知的。 我该如何规避这个问题?

【问题讨论】:

    标签: multithreading julia channel


    【解决方案1】:

    我认为,从您的 MWE 来看,您不知道在运行时之前您将调用 producer() 多少次?如果是这样,您可以在运行时根据该值索引您的循环:

    const nputs = [Threads.nthreads()]  # or whatever you need it to be > 0
    
    # Produces input values
    function producer(c::Channel)
        for n in 1:4
            println("Generating | ", n)
            put!(c,n)
        end
    end
    #####
    # Expensive function
    function f(n::Int64)
        println("Running | ", n)
        sleep(2.0)
    end
    #####
    # Evaluates the function over all the input values
    function test()
        @Threads.threads for n in 1:first(nputs)
            Channel(producer, first(nputs) * 4)
            f(n)
        end
    end
    
    #####
    test()
    

    【讨论】:

    • 不幸的是,我事先不知道生产者将创建的元素数量。我已经更新了我的 MWE 来解决这个限制。看了Julia v1.6 的一些即将推出的功能,我想我的解决方案可能就在这个代码补丁中?
    【解决方案2】:

    遍历Channel 是线程安全的:您可以只使用外部@threads 循环。

    using Base.Threads
    
    ## ------------------------------------------------------------
    # Produces input values
    function producer(c::Channel)
        nb_inputs = rand(1:10)
        for n=1:nb_inputs
            println("Generating | ", n, " at: ", threadid())
            put!(c,n)
        end
    end
    
    ## ------------------------------------------------------------
    # Expensive function
    function f(n::Int64)
        println("Running | ", n, " at: ", threadid())
        sleep(2.0)
    end
    
    ## ------------------------------------------------------------
    # Evaluates the function over all the input values
    function test()
        Ch = Channel(producer)
        @threads for _ in 1:nthreads()
            for n in Ch
                f(n)
            end
        end
    end
    
    ## ------------------------------------------------------------
    test()
    
    ## ------------------------------------------------------------
    # Or in julia > v1.6
    function test()
        Ch = Channel(producer)
        Threads.foreach(Ch) do n
            f(n)
        end
    end
    
    ## ------------------------------------------------------------
    test()
    
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-11-27
      • 2014-04-26
      • 1970-01-01
      • 1970-01-01
      • 2014-12-31
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多