【问题标题】:Closing a channel at the producer end when all the jobs are finished所有作业完成后在生产者端关闭通道
【发布时间】:2018-04-09 14:16:14
【问题描述】:

对于我的 Mandelbrot explorer 项目,我需要运行几个昂贵的作业,最好是并行运行。我决定尝试对作业进行分块,并在自己的thread 中运行每个块,最终得到类似

(defn point-calculator [chunk-size points]
  (let [out-chan (chan (count points))
        chunked (partition chunk-size points)]

    (doseq [chunk chunked]
      (thread
        (let [processed-chunk (expensive-calculation chunk)]
          (>!! out-chan processed-chunk))))

    out-chan))

points 是要测试的 [real, imaginary] 坐标列表,expensive-calculation 是一个获取块的函数,并测试块中的每个点。每个块可能需要很长时间才能完成(可能需要一分钟或更长时间,具体取决于块大小和作业数量)。

在我的消费者端,我正在使用

(loop []
  (when-let [proc-chunk (<!! result-chan)]
   ; Do stuff with chunk
   (recur)))

消耗每个处理过的块。现在,由于通道仍处于打开状态,因此会在消耗最后一个块时阻塞。

我需要一种在工作完成后关闭频道的方法。由于生产者循环的异步性,这被证明是困难的。我不能简单地将close! 放在doseq 之后,因为循环不会阻塞,并且我不能在最后一个索引作业完成时关闭,因为顺序是不确定的。

我能想到的最好的主意是维护一个(atom #{}) 的工作,并在每个工作完成时保持disj。然后我可以检查循环中设置的大小,close! 当它为 0 时,或者将手表连接到原子并在那里检查。

不过,这似乎很骇人听闻。有没有更惯用的方法来处理这个问题?这种情况是否表明我错误地使用了async

【问题讨论】:

    标签: clojure


    【解决方案1】:

    我会看看core-async 中的take 函数。这就是它的文档所说的:

    "返回一个通道,该通道最多返回来自 ch 的 n 个项目。在 n 个项目之后 已返回,或ch已关闭,返回通道将关闭。 "

    所以它会引导您进行一个简单的修复:您可以将其包装到 take 中,而不是返回 out-chan

    (clojure.core.async/take (count chunked) out-chan)

    应该可以。 另外我建议你重写你的例子,从阻塞 put/get 到停车(&lt;!&gt;!)和threadgo / go-loop,这是核心异步的更惯用用法。

    【讨论】:

    • 谢谢,我稍后再试试。我需要回滚我的 PC -_- 而且我正在使用停车呼叫,但我被引导相信创建大量包含长时间运行进程的 go 块是一个糟糕的设计。
    【解决方案2】:

    您可能希望使用 async/pipeline(-blocking) 来控制并行度。并使用aysnc/onto-chan在所有chunk复制完成后自动关闭输入通道。

    例如下面的示例显示当并行度设置为 16 时,经过时间提高了 16 倍。

    (defn expensive-calculation [pts]
      (Thread/sleep 100)
      (reduce + pts))
    
    (time
     (let [points     (take 10000 (repeatedly #(rand 100)))
           chunk-size 500
           inp-chan   (chan)
           out-chan   (chan)]
       (go-loop [] (when-let [res (<! out-chan)]
                     ;; do stuff with chunk
                     (recur)))
       (pipeline-blocking 16 out-chan (map expensive-calculation) inp-chan)
       (<!! (onto-chan inp-chan (partition-all chunk-size points)))))
    

    【讨论】:

      猜你喜欢
      • 2018-10-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-03-16
      • 2017-10-06
      • 1970-01-01
      • 2017-03-26
      • 1970-01-01
      相关资源
      最近更新 更多