【问题标题】:How to best shut down a clojure core.async pipeline of processes如何最好地关闭进程的 clojure core.async 管道
【发布时间】:2016-01-08 04:40:57
【问题描述】:

我有一个 clojure 处理应用程序,它是一个通道管道。每个处理步骤都异步进行计算(即使用 http-kit 或其他东西发出 http 请求),并将结果放在输出通道上。这样下一步就可以从该通道读取数据并进行计算。

我的主要功能是这样的

(defn -main [args]
 (-> file/tmp-dir
  (schedule/scheduler)
  (search/searcher)
  (process/resultprocessor)
  (buy/buyer)
  (report/reporter)))

目前,调度程序步骤驱动管道(它没有输入通道),并为链提供工作负载。

当我在 REPL 中运行它时:

(-main "some args")

由于调度程序的无限性,它基本上永远运行。更改此架构以使我可以从 REPL 关闭整个系统的最佳方法是什么?关闭每个通道是否意味着系统终止?

一些广播频道会有帮助吗?

【问题讨论】:

  • 不幸的是,这也杀死了 REPL。我会尝试组件方法

标签: clojure core.async


【解决方案1】:

您可以将调度程序 alts! / alts!! 放在终止通道和管道的输入通道上:

(def kill-channel (async/chan))

(defn scheduler [input output-ch kill-ch]
  (loop []
    (let [[v p] (async/alts!! [kill-ch [out-ch (preprocess input)]]
                  :priority true)]
      (if-not (= p kill-ch)
        (recur))))

kill-channel 上设置一个值将终止循环。

从技术上讲,您也可以使用 output-ch 来控制进程(放置到关闭的通道返回 false),但我通常会发现显式杀死通道更清洁,至少对于顶级管道而言。

为了使事情同时更优雅和更方便使用(在 REPL 和生产中),您可以使用 Stuart Sierra's component,启动调度程序循环(在单独的线程上)和 assoc 杀死通道你的组件在组件的start 方法中,然后在组件的stop 方法中close! 杀死通道(从而终止循环)。

【讨论】:

    【解决方案2】:

    我建议使用https://github.com/stuartsierra/component 之类的东西来处理系统设置。它确保您可以在 REPL 中轻松启动和停止系统。使用该库,您将对其进行设置,以便每个处理步骤都是一个组件,并且每个组件将在其startstop 协议中处理通道的设置和拆卸。此外,您可能会为每个要实现的组件创建一个IStream 协议,并让每个组件依赖于实现该协议的组件。它为您带来了一些非常简单的模块化。

    您最终会得到一个如下所示的系统:

    (component/system-map
     :scheduler (schedule/new-scheduler file/tmp-dir)
     :searcher  (component/using (search/searcher)
                                 {:in :scheduler})
     :processor (component/using (process/resultprocessor)
                                 {:in :searcher})
     :buyer     (component/using (buy/buyer)
                                 {:in :processor})
     :report    (component/using (report/reporter)
                                 {:in :buyer}))
    

    这种方法的一个好处是,如果组件也依赖于通道,您可以轻松添加组件。例如,如果每个组件在内部 mult 上使用 tap 创建其输出通道,则您可以仅通过将处理器作为依赖项的日志记录组件为处理器添加一个记录器。

     :processor (component/using (process/resultprocessor)
                                 {:in :searcher})
     :processor-logger (component/using (log/logger)
                                        {:in processor})
    

    我建议您也观看他的 talk 以了解其工作原理。

    【讨论】:

      【解决方案3】:

      您应该考虑使用Stuart Sierra's reloaded workflow,这取决于将您的“管道”元素建模为components,这样您就可以将您的逻辑单例建模为“类”,这意味着您可以控制构造和销毁(开始/停止) 逻辑。

      【讨论】:

        猜你喜欢
        • 2019-02-13
        • 1970-01-01
        • 2015-07-04
        • 1970-01-01
        • 2015-05-07
        • 2014-03-11
        • 2015-07-11
        • 2016-08-25
        • 1970-01-01
        相关资源
        最近更新 更多