【发布时间】:2018-04-09 14:16:14
【问题描述】:
对于我的 Mandelbrot explorer 项目,我需要运行几个昂贵的作业,最好是并行运行。我决定尝试对作业进行分块,并在自己的thread 中运行每个块,最终得到类似
(defn point-calculator [chunk-size points]
(let [out-chan (chan (count points))
chunked (partition chunk-size points)]
(doseq [chunk chunked]
(thread
(let [processed-chunk (expensive-calculation chunk)]
(>!! out-chan processed-chunk))))
out-chan))
points 是要测试的 [real, imaginary] 坐标列表,expensive-calculation 是一个获取块的函数,并测试块中的每个点。每个块可能需要很长时间才能完成(可能需要一分钟或更长时间,具体取决于块大小和作业数量)。
在我的消费者端,我正在使用
(loop []
(when-let [proc-chunk (<!! result-chan)]
; Do stuff with chunk
(recur)))
消耗每个处理过的块。现在,由于通道仍处于打开状态,因此会在消耗最后一个块时阻塞。
我需要一种在工作完成后关闭频道的方法。由于生产者循环的异步性,这被证明是困难的。我不能简单地将close! 放在doseq 之后,因为循环不会阻塞,并且我不能在最后一个索引作业完成时关闭,因为顺序是不确定的。
我能想到的最好的主意是维护一个(atom #{}) 的工作,并在每个工作完成时保持disj。然后我可以检查循环中设置的大小,close! 当它为 0 时,或者将手表连接到原子并在那里检查。
不过,这似乎很骇人听闻。有没有更惯用的方法来处理这个问题?这种情况是否表明我错误地使用了async?
【问题讨论】:
标签: clojure