【问题标题】:Parallel doseq for ClojureClojure 的平行剂量
【发布时间】:2012-06-10 14:56:43
【问题描述】:

我根本没有在 Clojure 中使用过多线程,所以不确定从哪里开始。

我有一个doseq,它的主体可以并行运行。我想要的是总是有 3 个线程在运行(留下 1 个核心空闲)并行评估主体,直到范围用完。没有共享状态,没有什么复杂的——相当于 Python 的多处理就可以了。

比如:

(dopar 3 [i (range 100)]
  ; repeated 100 times in 3 parallel threads...
  ...)

我应该从哪里开始寻找?有这个命令吗?标准包装?一个好的参考?

到目前为止,我已经找到 pmap,并且可以使用它(我如何一次限制为 3 个?looks like it uses 32 at a time - 不,消息来源说 2 + 处理器数量),但似乎这是一个基本的应该已经存在于某处的原语。

澄清:我真的很想控制线程数。我有长时间运行的进程并使用大量内存,因此创建大量进程并希望一切正常并不是一个好方法 (example which uses a significant chunk available mem)。

更新:开始编写一个宏来执行此操作,我需要一个信号量(或互斥体,或我可以等待的原子)。 Clojure 中是否存在信号量?或者我应该使用 ThreadPoolExecutor?不得不从 Java 中提取这么多东西似乎很奇怪——我认为 Clojure 中的并行编程应该很容易......也许我在想这个完全错误的方式?嗯。代理?

【问题讨论】:

    标签: multithreading clojure parallel-processing


    【解决方案1】:

    好的,我认为我想要的是每个循环都有一个agent,数据使用send发送到代理。使用send 触发的代理是从线程池运行的,因此数量在某种程度上是有限的(它不能精确控制三个线程,但现在必须这样做)。

    [Dave Ray 在 cmets 中解释:要控制池大小,我需要自己编写]

    (defmacro dopar [seq-expr & body]
      (assert (= 2 (count seq-expr)) "single pair of forms in sequence expression")
      (let [[k v] seq-expr]
        `(apply await
           (for [k# ~v]
             (let [a# (agent k#)]
               (send a# (fn [~k] ~@body))
             a#)))))
    

    可以这样使用:

    (deftest test-dump
      (dopar [n (range 7 11)]
        (time (do-dump-single "/tmp/single" "a" n 10000000))))
    

    耶!作品!我好棒! (好吧,Clojure 也有点摇滚)。 Related blog post.

    【讨论】:

    • 如果你想控制线程池,你需要自己构建。 Clojure 通过提供用于减少突变的工具(默认不变性、stm 等)使并发更简单,但是开箱即用,如果您需要细粒度的线程控制,超出代理的范围,它希望您遵循 java.util.concurrent和期货提供。
    • 是的,谢谢。我从某人那里找到了一篇博客文章,展示了如何做到这一点(会发布,但又丢失了。)。
    【解决方案2】:

    pmap 在大多数情况下实际上可以正常工作 - 它为您的机器使用具有合理数量线程的线程池。我不会费心尝试创建自己的机制来控制线程数,除非您有真正的基准证据表明默认值会导致问题。

    话虽如此,如果你真的想限制最多三个线程,一个简单的方法是只对范围的 3 个子集使用 pmap:

    (defn split-equally [num coll] 
      "Split a collection into a vector of (as close as possible) equally sized parts"
      (loop [num num 
             parts []
             coll coll
             c (count coll)]
        (if (<= num 0)
          parts
          (let [t (quot (+ c num -1) num)]
            (recur (dec num) (conj parts (take t coll)) (drop t coll) (- c t)))))) 
    
    (defmacro dopar [thread-count [sym coll] & body]
     `(doall (pmap 
        (fn [vals#]
          (doseq [~sym vals#]
            ~@body))  
        (split-equally ~thread-count ~coll))))
    

    注意doall 的使用,这是强制评估pmap(这是惰性的)所必需的。

    【讨论】:

    • 这很好,谢谢,但我认为它假设所有任务都在同一时间持续(他们不会)。
    • ...或者有这么多,平均法则可以解决问题。我并不是想变得困难,但这也不是最佳选择。
    • dorun 是一个不错的选择,而不是 doall 如果你不想持有对头部的引用(更少的内存使用)
    【解决方案3】:

    实际上现在有一个library 可以做到这一点。来自他们的github

    claypoole 库提供基于线程池的 Clojure 函数并行版本,例如 pmapfuturefor

    它提供相同的有序/无序版本。

    【讨论】:

    • 多么可悲,毕竟 clojure 的并发性大肆宣传......如果不回退到 Java 或那个额外的库,就无法做很多事情
    • 我花了一段时间才发现,claypoole 库中 doseq 的并行版本被称为 pdoseq
    【解决方案4】:

    为什么不直接使用 pmap?您仍然无法控制线程池,但它比编写使用代理的自定义宏(为什么不使用期货?)要少得多。

    【讨论】:

    • 我希望发送的线程池小于 32(但我不知道是不是这样)。会看期货,谢谢。
    • 是否可以阻止一组期货,直到有一个期货可用?如果是这样,我认为这将提供我想要的细粒度控制...
    • Send 的线程池大致就是你拥有的处理器数量。 Future 使用无限线程池,但 pmap 避免一次旋转太多。
    • afact(参见上面问题中链接的答案)pmap 使用 32(它块)。
    • 仅当您的输入序列被分块时。
    【解决方案5】:

    我对以下要求有类似的问题:

    1. 控制使用的线程数;
    2. 不知道线程池的管理;
    3. 任务的顺序不需要保留;
    4. 任务的处理时间可以不同,因此任务的顺序一定不能保持,但较早完成的任务应较早返回;
    5. 懒惰地评估和提交输入序列;
    6. 输入序列中的元素不应被越界读取,而应根据返回的结果进行缓冲和读取,以避免出现内存不足的问题。

    核心pmap函数只满足最后两个假设。

    这是一个满足这些假设的实现,它使用标准 Java 线程池 ExecutorServiceCompletionService 以及输入流的一些分区:

    (require '[clojure.tools.logging :as log])
    
    (import [java.util.concurrent ExecutorService ExecutorCompletionService 
                                  CompletionService Future])
    
    (defn take-seq
      [^CompletionService pool]
      (lazy-seq
       (let [^Future result (.take pool)]
         (cons (.get result)
               (take-seq pool)))))
    
    (defn qmap
      [^ExecutorService pool chunk-size f coll]
      (let [worker (ExecutorCompletionService. pool)]
        (mapcat
         (fn [chunk]
           (let [actual-size (atom 0)]
             (log/debug "Submitting payload for processing")
             (doseq [item chunk]
               (.submit worker #(f item))
               (swap! actual-size inc))
             (log/debug "Outputting completed results for" @actual-size "trades")
             (take @actual-size (take-seq worker))))
         (partition-all chunk-size coll))))
    

    可以看出qmap并没有实例化线程池本身,而只是ExecutorCompletionService。例如,这允许传入固定大小的ThreadPoolExecutorService。此外,由于qmap 返回一个惰性序列,它不能也不能管理线程池资源本身。最后,chunk-size 允许限制输入序列的多少元素被实现并作为任务一次提交。

    下面的代码演示了正确的用法:

    (import [java.util.concurrent Executors])
    
    (let [thread-pool (Executors/newFixedThreadPool 3)]
      (try
        (doseq [result (qmap thread-pool
                             ;; submit no more than 500 tasks at once
                             500 
                             long-running-resource-intensive-fn
                             unboundedly-large-lazy-input-coll)]
          (println result))
        (finally
          ;; (.shutdown) only prohibits submitting new tasks,
          ;; (.shutdownNow) will even cancel already submitted tasks.
          (.shutdownNow thread-pool))))
    

    以下是一些使用的 Java 并发类的文档:

    【讨论】:

    • 谢谢;我没有详细看这个,但你似乎已经理解了这个问题。
    【解决方案6】:

    不确定它是否是惯用的,因为我还是 Clojure 的初学者,但以下解决方案对我有用,而且看起来也很简洁:

    (let [number-of-threads 3
          await-timeout 1000]
      (doseq [p-items (partition number-of-threads items)]
        (let [agents (map agent p-items)]
          (doseq [a agents] (send-off a process))
          (apply await-for await-timeout agents)
          (map deref agents))))
    

    【讨论】:

    • 谢谢,但它与 mikera 的回答没有类似的问题吗?因为事情不是并行流畅地运行,而是分批运行(效率较低)。而且,我认为,mikeras 的效率较低,因为它有更多批次。也就是说,它有一个固定的进程到cpu的映射,不能有效地适应不同进程的不同运行时间。
    • 绝对:它有一个固定的映射(在上面的例子中是 3 个线程)。当您要求“始终运行 3 个线程”时,我认为这就是您想要的。如果您希望系统为您优化它,那么我肯定会使用“pmap”。
    • 当我说我总是希望运行 3 个线程时,我的意思是我总是希望运行 3 个线程。也许我不明白,但在我看来,您的解决方案中有时只有 1 或 2 个线程在运行)。如果我使用 pmap 则有 6 个线程正在运行。非常好心给我指出其他答案,但他们似乎实际上并没有运行三个线程,这就是我所要求的......
    【解决方案7】:

    使用管道和渠道。如果您的操作是 IO 绑定的,这是一个更可取的选项,因为 pmap 的池绑定到 CPU 数量。

    另一个不错的选择是使用代理和发送,它在下面使用 cachedThredPoolExecutor。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-08-02
      • 1970-01-01
      • 1970-01-01
      • 2020-05-09
      • 2017-03-29
      • 2016-03-06
      • 2016-12-08
      • 2016-02-01
      相关资源
      最近更新 更多