【问题标题】:Clojure - Incrementing numbers in a list efficiently and concurrentlyClojure - 有效地同时增加列表中的数字
【发布时间】:2014-01-08 23:26:32
【问题描述】:

简短版:在 Clojure 中存储数百个数字的列表的正确方法是什么,其中每个数字都被递增数百万次(可能跨多个线程)?

长版:程序从一个空向量开始,其中每个值都初始化为0:

[0 0 0 0 0 0 0 0 0 ...]

然后它会逐行读取数百万行的文件。在对一行执行一些任意计算之后,程序会增加向量中的一些值。在第一行之后,向量可能如下所示:

[1 1 1 2 0 1 0 1 1 ...]

第二行之后:

[2 2 3 2 2 1 0 2 2 ...]

大约 5000 行之后,它可能看起来像:

[5000 4998 5008 5002 4225 5098 5002 5043 ...]

由于 Clojure 的数据结构是不可变的,因此仅使用 assoc 来增加向量中的值似乎非常浪费,因为每次递增都会复制整个向量。

在不花费所有 CPU 时间复制不可变数据结构的情况下进行这种并发数据聚合的正确方法是什么?我是否应该有一个向量,其中每个元素都类似于 ref 或 atom,所有线程都增加这些共享值?或者,是否有某种线程级的数据结构可以存储计数,然后最后一步是合并来自每个线程的计数?

这可能不会在单个线程上绑定 I/O,所以我猜我会将行处理拆分到多个线程中。向量的长度没有限制(可能有几千个元素长),但很可能是大约 100 个元素长。

【问题讨论】:

    标签: multithreading performance data-structures concurrency clojure


    【解决方案1】:

    Clojure 的 vector 是持久数据结构。更新向量中的元素时,它不会复制整个元素,并且需要基本恒定的时间,这意味着 O(log32 n)。

    但您似乎每次迭代都会更新向量中的几乎每个元素。也许你想推荐Transient Data Structures

    【讨论】:

      【解决方案2】:

      一种方法是将向量创建为原子向量(而不是值),然后同时更新向量中的原子。

      (def len 1000)
      
      (def vec-data (into [] (repeatedly len #(atom 0))))
      
      ;Create 10 future (threads) that update the vector atoms concurrently    
      (doall (for [_ (range 10)]
                  (future (doall (map #(swap! (vec-data %) inc) (range len) )))))
      

      【讨论】:

        【解决方案3】:

        我建议如下:

        • 使用core.matrix API 进行向量运算。
        • 使用可变向量实现,可能是vectorz-clj(假设双精度数字适合您?)
        • 使用(zero-vector n) 创建任意大小的累加器向量
        • 每个线程将在一行上进行计算,生成一个新向量以添加到累加器中
        • 然后调用(add! accumulator ...) 对累加器向量进行可变加法(如果您担心线程安全,可以使用代理或其他一些并发技术来序列化这些加法)

        【讨论】:

          【解决方案4】:

          我想知道 core.matrix 是否可以用于此目的。这有点矫枉过正,但会使更新变得容易。如果您探索这条路线,我建议您尝试不同的实现(ndarray、vectorz 和 clatrix 支持可变性),看看哪种方式最适合您正在做的事情。

          【讨论】:

            【解决方案5】:

            哦,你确实说过并发,直到我发布后我才错过。对不起。在我的帖子中向下滚动,看看我建议如何同时进行。

            您可以在 Clojure 中使用 java 字节数组或长数组。您只需要小心控制如何使用它们。

            例如,这里有一个质数筛,它演示了两件事。

            首先它使用一个字节数组,并使用 aset-byte 设置数组中的字节,然后 get 访问这些字节。请参阅设置 [标志(字节数组大小)] 的第一个 let 语句。您还可以使用 (long-array size) 返回一个长数组。但是你应该使用 (aset-long array index value) 来设置那个长数组中的值。

            第二个但与您的问题没有直接关系,它使用瞬态向量(标准 Clojure 功能)在循环的末尾构建结果,然后在返回持久向量之前将该向量转换为持久向量。

            (defn sieve1
              "Generate a vector of all prime numbers up to maxN.
               maxN must be 2 or greater."
              [maxN]
            
              (when (< maxN 2)
                (throw (java.lang.IllegalArgumentException. (str "parameter maxN (" maxN ") must be 2 or greater."))))
            
              (let [size (inc maxN) ; because array is zero based
                    ;nSqrt (dbmath/isqrt maxN)
                    flags (byte-array size)]
                ;(println (format "maxN: %s; size: %s; nSqrt: %s" maxN size, nSqrt))
            
                ; Set all flags.
                (loop [i 0]
                    (when (<= i maxN)
                      (aset-byte flags i 1)
                      (recur (inc i))))
            
                ; Strike out all non primes before two.
                ; (zero and one are not prime.)
                (aset-byte flags 0 0)
                (aset-byte flags 1 0)
            
                ; Strike out multiples of 2.
                ;(println "strike out multiples of two.")
                (loop [j 4]
                  (when (<= j maxN)
                    ;(println (format "aset %s 0" j))
                    (aset-byte flags j 0)
                    (recur (+ j 2))))
            
                ; Strike out multiples of primes (only odd primes are now remaining)
                ;(println "strike out multiples of primes.")
                (loop [i 3]
                  (when (<= i maxN)
                    (when (= 1 (aget flags i))
                      ; found that i is prime.
                      ;(println (format "discovered i is prime: i=%s;" i))
            
                      ; Strike out multiples of i, starting with i^2.
                      (loop [j (* i i)]
                        (when (<= j maxN)
                          ;(println (format "aset %s 0" j))
                          (aset-byte flags j 0)
                          (recur (+ j i))))
                      )
                    (recur (+ i 2))))
            
                ; Build result.
                (let [primes (transient [2])]
                  (loop [i 3]
                    (when (<= i maxN)
                      (when (= 1 (aget flags i))
                        (conj! primes i))
                      (recur (+ i 2))
                      ))
                  (persistent! primes))
                ))
            

            为什么我使用字节数组作为标志,然后使用瞬态向量来创建结果向量?为了让它快!所有字节数组内容和瞬态向量内容都完全发生在例程中,在单个线程上,并且不会泄漏。试试 (sieve1 10000000) 一千万,看看有多快。

            同时进行。

            如果你在一个原子中放置一个长数组会怎样。然后使用 Clojure 的交换!获得并发。交换!将保证一次只有一个线程将原子(长数组)的内容与原子的新值交换(即使您的交换函数可能只返回相同的长数组,但在更改了一些长数组之后数组中的值)。只要你所有的线程都同意不改变长数组,除非使用交换!那么我看不出有什么问题。

            【讨论】:

              猜你喜欢
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 2018-02-21
              • 2015-10-01
              相关资源
              最近更新 更多