【问题标题】:Clojure - Core.async Pipeline + take confusionClojure - Core.async 管道 + 混淆
【发布时间】:2019-02-13 20:34:56
【问题描述】:

我很难理解我认为 Clojure 的异步库中的一个非常简单的概念。我实质上是在使用管道创建两个通道,其中输出通道是使用输入通道的 take 函数创建的。

据我了解, take 的目的是限制通道在关闭之前将接收的项目数(如果此时输入通道尚未关闭)。但是,我一直在使用的代码示例并没有产生我预期的结果。

以如下代码为例:

(def in (chan 1))
(def out (async/take 5 in 1))

(doseq [i (range 10)]
  (go (>! in i)))

(pipeline 4 out (filter even?) in)

(go-loop []
  (when-some [val (<! out)]
    (println val)
    (recur))))

我预计会发生的是,管道会过滤掉奇数,并且只将偶数传递给“输出”通道,当输出通道收到 5 个偶数时,它会关闭。然而,我看到的是打印到 REPL 的奇数和偶数,如下所示:

2 7 4 0 8 6

此时输出通道仍未关闭,第二次运行 doseq 将在最终关闭之前打印一些其他值。

我对这里发生的事情感到非常困惑,它在使用 take 而不是管道时就像一个魅力,在不使用 take 但仍然使用管道时它也有效,将两者结合使用是完全不同的故事似乎。我在这里遗漏了一些明显的东西吗?抱歉,如果这是一个简单的错误,这是我第一次(尽管很天真)尝试使用 core.async。

【问题讨论】:

    标签: asynchronous clojure core.async


    【解决方案1】:

    您已将takepipeline 置于竞争中。他们俩都从in 获取项目并将它们添加到out。替换out的定义:

    (def out (async/chan 3))
    

    例如,得到预期的结果

    0
    2
    4
    6
    8
    

    如果你真的想使用async/take,你可以这样做:

    (def first (async/chan 1))
    (def second (async/chan 3))
    (pipeline 4 second (filter even?) first)
    (def third (async/take 3 second))
    
    (defn run []
      (go
        (doseq [i (range 10)]
          (>! first i)))
      (go (loop []
            (when-some [val (<! third)]
              (println val)
              (recur)))))
    

    结果:

    0
    2
    4
    

    【讨论】:

    • 啊!完全有道理。感谢您的帮助:)
    猜你喜欢
    • 2015-07-11
    • 2015-07-04
    • 1970-01-01
    • 1970-01-01
    • 2016-01-08
    • 2015-07-03
    • 1970-01-01
    • 1970-01-01
    • 2021-09-03
    相关资源
    最近更新 更多