【发布时间】:2020-10-06 14:04:00
【问题描述】:
我正在使用 clojure 编写一个脚本,以从文件中读取一系列 URI 作为输入,并报告它们的状态代码。
我已经使用clojure.core.async/pipeline-async 实现了这一点,以执行对 URI 的 HTTP 调用(使用 httpkit 异步调用)。
我想监控脚本的执行,所以我有一个状态原子:
(let [processing (atom [(System/currentTimeMillis) 0])]
还有一个跟踪进度的函数。
(defn track-progress [total progress]
(swap! progress
(fn [[time count]]
(let [incremented-count (inc count)
now (System/currentTimeMillis)]
(if (= 0 (mod incremented-count (max 1 (int (/ total 20)))))
(do
(println (str "Progress " incremented-count "/" total " | " (- now time) "ms"))
[now incremented-count])
[time incremented-count])))))
在 HTTP 调用之后使用它:
(a/pipeline-async
parallelism
output-chan
(fn [[an-id uri] result]
(http/head uri {:throw-exceptions false
:timeout timeout}
(fn [{:keys [status error]}]
(track-progress total processing)
(a/go
(if (nil? error)
(do (a/>! result [an-id (keyword (str status))])
(a/close! result))
(do (a/>! result [an-id :error])
(a/close! result)))))))
input-chan)
processing 原子是在 let 表达式中创建的,使用 pipeline-async 部分。
除了那个日志之外,一切似乎都运行良好。
我发现有时日志记录很奇怪,有这样的东西:
Progress 500/10000 | 11519ms
Progress 500/10000 | 11519msProgress 500/10000 | 11519ms
Progress 1000/10000 | 11446ms
Progress 1000/10000 | 11446ms
Progress 1500/10000 | 9503ms
Progress 2000/10000 | 7802ms
Progress 2500/10000 | 12822ms
Progress 2500/10000 | 12822msProgress 2500/10000 | 12822ms
Progress 2500/10000 | 12822ms
Progress 3000/10000 | 10623ms
Progress 3500/10000 | 9018ms
Progress 4000/10000 | 9618ms
Progress 4500/10000 | 13544ms
Progress 5000/10000 | 10541ms
Progress 5500/10000 | 10817ms
Progress 6000/10000 | 8921ms
Progress 6500/10000 | 9078ms
Progress 6500/10000 | 9078ms
Progress 7000/10000 | 9270ms
Progress 7500/10000 | 11826msProgress 7500/10000 | 11826msProgress 7500/10000 | 11826ms
输出被格式化为在shell中写入,似乎有时相同的println被多次执行,或者传递给swap!函数的fn被并行执行(没有并发)原子。
(如果println我删除str来创建要打印的字符串,那么我多次具有相同进度的行完全混合在一起,就像ProgressProgress 7500/10000 | 11826ms7500/100007500 | 11826msProgress/10000 | 11826ms一样)
我的代码有问题吗?
或者我弄错了atom,因为我认为它不允许并行执行改变其状态的函数?
【问题讨论】:
标签: asynchronous concurrency clojure