【问题标题】:Clojure swap! atom executed in parallelClojure 交换!并行执行的原子
【发布时间】:2020-10-06 14:04:00
【问题描述】:

我正在使用 clojure 编写一个脚本,以从文件中读取一系列 URI 作为输入,并报告它们的状态代码。

我已经使用clojure.core.async/pipeline-async 实现了这一点,以执行对 URI 的 HTTP 调用(使用 httpkit 异步调用)。

我想监控脚本的执行,所以我有一个状态原子:

(let [processing (atom [(System/currentTimeMillis) 0])]

还有一个跟踪进度的函数。

(defn track-progress [total progress]
  (swap! progress 
         (fn [[time count]]
            (let [incremented-count (inc count)
                  now (System/currentTimeMillis)]
              (if (= 0 (mod incremented-count (max 1 (int (/ total 20)))))
                (do
                  (println (str "Progress " incremented-count "/" total " | " (- now time) "ms"))
                  [now incremented-count])
                [time incremented-count])))))

在 HTTP 调用之后使用它:

(a/pipeline-async
      parallelism
      output-chan
      (fn [[an-id uri] result]
        (http/head uri {:throw-exceptions false
                        :timeout timeout}
                   (fn [{:keys [status error]}]
                     (track-progress total processing)
                     (a/go 
                        (if (nil? error)
                            (do (a/>! result [an-id (keyword (str status))])
                                (a/close! result))
                            (do (a/>! result [an-id :error])
                                (a/close! result)))))))
      input-chan)

processing 原子是在 let 表达式中创建的,使用 pipeline-async 部分。
除了那个日志之外,一切似乎都运行良好。 我发现有时日志记录很奇怪,有这样的东西:


Progress 500/10000 | 11519ms
Progress 500/10000 | 11519msProgress 500/10000 | 11519ms

Progress 1000/10000 | 11446ms
Progress 1000/10000 | 11446ms
Progress 1500/10000 | 9503ms
Progress 2000/10000 | 7802ms
Progress 2500/10000 | 12822ms
Progress 2500/10000 | 12822msProgress 2500/10000 | 12822ms
Progress 2500/10000 | 12822ms

Progress 3000/10000 | 10623ms
Progress 3500/10000 | 9018ms
Progress 4000/10000 | 9618ms
Progress 4500/10000 | 13544ms
Progress 5000/10000 | 10541ms
Progress 5500/10000 | 10817ms
Progress 6000/10000 | 8921ms
Progress 6500/10000 | 9078ms
Progress 6500/10000 | 9078ms
Progress 7000/10000 | 9270ms
Progress 7500/10000 | 11826msProgress 7500/10000 | 11826msProgress 7500/10000 | 11826ms

输出被格式化为在shell中写入,似乎有时相同的println被多次执行,或者传递给swap!函数的fn被并行执行(没有并发)原子。 (如果println我删除str来创建要打印的字符串,那么我多次具有相同进度的行完全混合在一起,就像ProgressProgress 7500/10000 | 11826ms7500/100007500 | 11826msProgress/10000 | 11826ms一样)

我的代码有问题吗?
或者我弄错了atom,因为我认为它不允许并行执行改变其状态的函数?

【问题讨论】:

    标签: asynchronous concurrency clojure


    【解决方案1】:

    Clojure atom 是专门设计的,因此在多线程程序中,可以有多个线程在单个 atom 上执行 swap!,如果您的程序这样做,那些更新函数 f 将被赋予 swap! 可以同时运行。 swap! 唯一同步的部分是“比较和交换”操作,它有效地执行了:

    • 锁定原子的状态
    • 检查它的当前值是否是identical? 到它在f 开始执行之前包含的引用,如果是,则将其替换为f 返回的新对象。
    • 解锁原子的状态”。

    函数f可能需要很长时间才能从当前值计算出一个新值,但上面的关键部分只是一个指针比较,如果相等,则进行指针赋值。

    这就是为什么swap! 的文档字符串说“注意 f 可能会被多次调用,因此应该没有副作用。”

    【讨论】:

    • 如果你改变你的函数track-progress,这样你传递给swap!的函数只计算和返回一个新的向量,然后之后你做了一个if 语句来 println 你想要什么,有条件地,那么你不应该看到多个具有相同 increment-count 值的日志行。您仍然可能会在同一行上看到多个,因为 println 不是原子的。
    • 知道了,所以我误解了swap!,谢谢你澄清它是如何工作的!
    • 或者如果你想线性化所有东西,你可以使用代理而不是原子。
    【解决方案2】:

    您想要的是从一组并发执行的线程中序列化输出流。您可以使用代理来序列化对一段可变状态的访问,但在这里您有一个没有状态的退化案例,只有副作用。对于这种情况,您只需要the locking function

    一个例子:

    (ns tst.demo.core
      (:use demo.core tupelo.core tupelo.test))
    
    (defn do-println
      [& args]
      (apply println args))
    
    (def lock-obj (Object.))
    (defn do-println-locking
      [& args]
      (locking lock-obj
        (apply println args)))
    
    (def sleep-millis 500)
    (defn wait-and-print
      [print-fn id]
      (Thread/sleep sleep-millis)
      (print-fn (format "wait-and-print %s is complete" id)))
    
    (defn start-threads
      [print-fn count]
      (println "-----------------------------------------------------------------------------")
      (let [futures (forv [i (range count)]
                      (future (wait-and-print print-fn i)))]
        (doseq [future futures]
          ; block until future is complete
          (deref future))))
    
    (dotest
      (start-threads do-println 10)
      (start-threads do-println-locking 10))
    

    典型结果:

    --------------------------------------
       Clojure 1.10.2-alpha1    Java 15
    --------------------------------------
    
    Testing tst.demo.core
    -----------------------------------------------------------------------------
    wait-and-print 4 is completewait-and-print 3 is completewait-and-print 2 is complete
    wait-and-print 8 is completewait-and-print 9 is complete
    wait-and-print 6 is completewait-and-print 1 is complete
    
    wait-and-print 7 is complete
    wait-and-print 0 is complete
    
    wait-and-print 5 is complete
    
    
    -----------------------------------------------------------------------------
    wait-and-print 5 is complete
    wait-and-print 8 is complete
    wait-and-print 7 is complete
    wait-and-print 9 is complete
    wait-and-print 6 is complete
    wait-and-print 3 is complete
    wait-and-print 0 is complete
    wait-and-print 4 is complete
    wait-and-print 2 is complete
    wait-and-print 1 is complete
    

    因此您可以看到来自locking 的未经序列化的输出是混乱的,而在第二种情况下,每个println 都允许一次完成一个(即使顺序仍然是随机的)。

    如果println 一次打印一个字符而不是一次打印一个字符串,则不同步情况下的结果会更加混乱。修改输出函数分别打印每个字符:

    (defn do-println
      [& args]
      (doseq [ch (str/join args)]
        (print ch))
      (newline))
    
    (def lock-obj (Object.))
    (defn do-println-locking
      [& args]
      (locking lock-obj
        (apply do-println args)))
    

    典型结果:

    --------------------------------------
       Clojure 1.10.2-alpha1    Java 15
    --------------------------------------
    
    Testing tst.demo.core
    -----------------------------------------------------------------------------
    wwwwwaaawwiiiattti--taaa--nnaiddnaa--dwpp-irrptaiir-niiantnttn  -dw2ta-  ani96ipds trn- i-pcndrota-impn nrpd4itl- n eipt5tr s e7i 
     incisots   mc0cpo olmmieppstll ee
    etctteo
    e-
     amnidps-l pectroeai
    intt- a1n di-sip rcsio nmctmpo plm3lew etaiei
    spt t-lceeatone
    d
    m-pplreitnet
     8 is complete
    -----------------------------------------------------------------------------
    wait-and-print 3 is complete
    wait-and-print 9 is complete
    wait-and-print 8 is complete
    wait-and-print 4 is complete
    wait-and-print 6 is complete
    wait-and-print 7 is complete
    wait-and-print 0 is complete
    wait-and-print 1 is complete
    wait-and-print 5 is complete
    wait-and-print 2 is complete
    

    但我们看到locking 将函数调用序列化,因此活动调用必须在下一个开始之前完成。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-12-10
      • 1970-01-01
      • 2021-04-17
      • 2017-05-29
      相关资源
      最近更新 更多