【问题标题】:Julia: Parallel for loop over partitions iteratorJulia:分区迭代器的并行 for 循环
【发布时间】:2015-07-15 22:07:15
【问题描述】:

所以我正在尝试迭代某些东西的分区列表,比如1:n 用于 13 到 21 之间的一些 n。理想情况下,我想要运行的代码如下所示:

valid_num = @parallel (+) for p in partitions(1:n)
  int(is_valid(p))
end

println(valid_num)

这将使用@parallel for 来减少我的问题。例如,将此与 Julia 文档中的示例进行比较:

nheads = @parallel (+) for i=1:200000000
  Int(rand(Bool))
end

但是,如果我尝试调整循环,我会收到以下错误:

ERROR: `getindex` has no method matching getindex(::SetPartitions{UnitRange{Int64}}, ::Int64)
 in anonymous at no file:1433
 in anonymous at multi.jl:1279
 in run_work_thunk at multi.jl:621
 in run_work_thunk at multi.jl:630
 in anonymous at task.jl:6

我认为这是因为我试图迭代不是1:n 形式的东西(编辑:我认为这是因为如果p=partitions(1:n),你不能调用p[3])。

我尝试使用pmap 来解决这个问题,但是因为分区的数量会变得非常大,非常快(1:13 的分区超过 250 万个,当我到达 1:21 时将是巨大的),构建如此大的数组成为一个问题。我让它运行了一夜,它仍然没有完成。

有人对我如何在 Julia 中有效地做到这一点有任何建议吗?我可以使用约 30 核计算机,而且我的任务似乎很容易并行化,所以如果有人知道在 Julia 中执行此操作的好方法,我将不胜感激。

非常感谢!

【问题讨论】:

  • 我认为您可以将for p in partitions(1:n) 替换为for p in collect(partitions(1:n))。但是,根据您问题的大小,您可能会遇到内存问题......此外,我很惊讶您无法迭代 partitions 的输出,因为我认为这是 this issue here 处理的。尽管如此,索引到partitions 的输出在我的机器上也不起作用,所以我显然遗漏了一些东西......
  • 我希望可以。如果我尝试for p in collect(partitions(1:21))),我知道我必须这样做,我会遇到内存问题:(

标签: parallel-processing julia


【解决方案1】:

下面的代码给出了 511,即一组 10 个中大小为 2 的分区数。

using Iterators
s = [1,2,3,4,5,6,7,8,9,10]
is_valid(p) = length(p)==2
valid_num = @parallel (+) for i = 1:30
  sum(map(is_valid, takenth(chain(1:29,drop(partitions(s), i-1)), 30)))
end

此解决方案结合了taketh、drop 和chain 迭代器,以获得与下面PREVIOUS ANSWER 下的take_every 迭代器相同的效果。请注意,在此解决方案中,每个进程都必须计算每个分区。但是,因为每个进程对drop 使用不同的参数,所以没有两个进程会在同一个分区上调用 is_valid。

除非您想通过大量数学计算来确定如何实际跳过分区,否则无法避免在至少一个进程上按顺序计算分区。我认为西蒙的回答是在一个进程上执行此操作并分配分区。我的要求每个工作进程自己计算分区,这意味着计算是重复的。但是,它是并行复制的,这(如果您实际上有 30 个处理器)不会花费您的时间。

以下是有关如何实际计算分区上的迭代器的资源:@​​987654321@。

以前的答案(比必要的更复杂)

我在写我的时候注意到了西蒙的回答。我们的解决方案似乎与我相似,除了我使用迭代器来避免将分区存储在内存中。我不确定哪种尺寸设置实际上会更快,但我认为两种选择都很好。假设计算 is_valid 比计算分区本身花费的时间要长得多,您可以执行以下操作:

s = [1,2,3,4]
is_valid(p) = length(p)==2
valid_num = @parallel (+) for i = 1:30
  foldl((x,y)->(x + int(is_valid(y))), 0, take_every(partitions(s), i-1, 30))
end

这给了我 7,即一组 4 的大小为 2 的分区数。take_every 函数返回一个迭代器,该迭代器返回从第 i 个开始的每 30 个分区。这是代码:

import Base: start, done, next
immutable TakeEvery{Itr}
  itr::Itr
  start::Any
  value::Any
  flag::Bool
  skip::Int64
end
function take_every(itr, offset, skip)
  value, state = Nothing, start(itr)
  for i = 1:(offset+1)
    if done(itr, state)
      return TakeEvery(itr, state, value, false, skip)
    end
    value, state = next(itr, state)
  end
  if done(itr, state)
    TakeEvery(itr, state, value, true, skip)
  else
    TakeEvery(itr, state, value, false, skip)
  end
end
function start{Itr}(itr::TakeEvery{Itr})
  itr.value, itr.start, itr.flag
end
function next{Itr}(itr::TakeEvery{Itr}, state)
  value, state_, flag = state
  for i=1:itr.skip
    if done(itr.itr, state_)
      return state[1], (value, state_, false)
    end
    value, state_ = next(itr.itr, state_)
  end
  if done(itr.itr, state_)
    state[1], (value, state_, !flag)
  else
    state[1], (value, state_, false)
  end
end
function done{Itr}(itr::TakeEvery{Itr}, state)
  done(itr.itr, state[2]) && !state[3]
end

【讨论】:

  • Iterators 包 (github.com/JuliaLang/Iterators.jl) 中的 taketh 函数似乎做了 TakeEvery 正在做的事情。但是看看你的代码,我想知道你为什么要创建一个 TakeEvery 类型。我还没有在 Julia 中制作任何数据结构,所以我很好奇何时制作/不制作新的数据结构。
  • taketh 是否允许偏移?对我来说似乎没有,这就是我自己制作的原因。如果可能的话,我总是不喜欢自己制作数据结构,但是 Julia 太新了,有时我需要的功能还没有实现。我也对 Julia 生态系统中的内容了解不多。
  • 哦,不过貌似他们有drop功能,可以和taketh结合来得到你想要的。
  • 编辑了我的答案以使用 taketh、drop 和 chain 而不是 take_every。
  • 啊,我明白了,谢谢!此外,如果有人好奇,我尝试了 jcrudy 的解决方案(仅针对一组基因),它花了……18 个小时。韦尔普。
【解决方案2】:

一种方法是将问题分成不太大而无法实现的部分,然后并行处理每个部分中的项目,例如如下:

function my_take(iter,state,n)
    i = n
    arr = Array[]
    while !done(iter,state) && (i>0)
        a,state = next(iter,state)
        push!(arr,a)
        i = i-1
    end
    return arr, state
end

function get_part(npart,npar)
    valid_num = 0
    p = partitions(1:npart)
    s = start(p)
    while !done(p,s)
        arr,s = my_take(p,s,npar)
        valid_num += @parallel (+) for a in arr
            length(a)
        end
    end
    return valid_num
end

valid_num = @time get_part(10,30)

我打算使用take() 方法从迭代器中实现最多npar 项,但take() 似乎已被弃用,因此我包含了我自己的实现,我称之为my_take()。因此getPart() 函数使用my_take() 一次获取最多npar 个分区并对它们进行计算。在这种情况下,计算只是将它们的长度相加,因为我没有 OP 的 is_valid() 函数的代码。 get_part() 然后返回结果。

因为length() 计算不是很耗时,所以这段代码在并行处理器上运行时实际上比在单处理器上运行时要慢:

$ julia -p 1 parpart.jl
elapsed time: 10.708567515 seconds (373025568 bytes allocated, 6.79% gc time)

$ julia -p 2 parpart.jl
elapsed time: 15.70633439 seconds (548394872 bytes allocated, 9.14% gc time)

或者,pmap() 可以用于问题的每个部分,而不是并行的 for 循环。

关于内存问题,当我用 4 个工作进程运行 Julia 时,实现来自 partitions(1:10) 的 30 个项目在我的 PC 上占用了近 1 GB 的内存,所以我希望实现即使是 partitions(1:21) 的一小部分也需要很大的内存处理内存。在尝试这样的计算之前,可能需要估计需要多少内存以查看它是否可能。

关于计算时间,注意:

julia> length(partitions(1:10))
115975

julia> length(partitions(1:21))
474869816156751

...因此,即使是 30 个内核上的高效并行处理也可能不足以在合理的时间内解决更大的问题。

【讨论】:

  • 太棒了!有趣的是,我刚刚注意到,当我使用 Iterators 包时,不再不推荐使用 take。如果你使用它,你必须反复调用 take and drop;看看这个交易如何与您的 my_take 方法相抵触会很有趣。我有预感你会更快。
  • 另外,如果有人好奇,我尝试了 Simon 的解决方案,用于一组大小为 15 的分区(我也只检查了大小为 5 的分区,因为我想避免部分大小为
  • @Uthsav Chitra:感谢您的反馈。在我的方法中,线程之间的唯一通信是在计算分区的单个线程和检查有效性(或长度,在我的示例中)的并行线程之间,以及所有并行线程必须完成的事实。但是,对于大小为 15 的集合,一次实现(例如)30 个分区所需的内存量可能仍大于可用 RAM。 jcrudy 的解决方案为每个处理器一次只实现一个分区,似乎避免了这个困难。很高兴看到您有解决方案。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-07-17
  • 2020-07-30
  • 2015-06-06
相关资源
最近更新 更多