【问题标题】:Clojure - core.async interface for apache kafkaClojure - apache kafka 的 core.async 接口
【发布时间】:2015-09-14 21:33:39
【问题描述】:

我正在使用clj-kafka,我正在尝试在 REPL 中为其创建一个core.async 接口。

我收到了一些消息,但我的结构感觉不对:我要么无法停止接收消息,要么必须再次启动 go 例程才能接收更多消息。

这是我的尝试:

(defn consume [topic]
  (let [consume-chan (chan)]
    (with-resource [c (consumer config)]
      shutdown
      (go (doseq [m (messages c "test")]
                   (>! chan message) ;; should I check the return value?
                   )))
    consume-chan)) ;; is it the right place to return a channel ?


  (def consume-chan (consume "test"))
  ;;(close! consume-chan)

  (go (>! consume-chan "hi")) ;; manual test, but I have some messages in Kafka already

  (def cons-ch (go
                 (with-resource [c (consumer config)]
                   shutdown
                   (doseq [m (messages c "test")]
                     (>! consume-chan m)))))  ;; should I check something here ?

  ;;(close! cons-ch)

  (def go-ch
    (go-loop []
      (if-let [km (<! consume-chan)]
        (do  (println "Got a value in this loop:" km)
              (recur))
        (do (println "Stop recurring - channel closed")))))

  ;;(close! go-ch)

如何使用 core.async 接口使用惰性消息序列?

【问题讨论】:

    标签: asynchronous clojure apache-kafka core.async clj-kafka


    【解决方案1】:

    我会这样做:

    • &gt;!&lt;! 如果通道关闭,则返回 nil,因此请确保在发生这种情况时退出循环 - 这样您就可以通过关闭通道轻松地从外部结束循环。

    • 使用 try/catch 检查 go 块内的异常,并将任何异常作为返回值,以免丢失。

    • 检查读取值是否存在异常,以捕获通道内的任何内容。

    • go 块返回一个通道,块内代码的返回值(如上面的异常)将放在通道上。检查这些渠道是否有异常,可能会重新抛出。

    您现在可以像这样写入频道:

    (defn write-seq-to-channel
      [channel
       values-seq]
      (a/go
        (try
          (loop [values values-seq]
            (when (seq values)
              (when (a/>! channel (first values))
                (recur (rest values)))))
          (catch Throwable e
            e))))
    

    你会这样读:

    (defn read-from-channel-and-print
      [channel]
      (a/go
        (try
          (loop []
            (let [value (a/<! channel)]
              (when value
                (when (instance? Throwable value)
                  (throw value))
                (println "Value read:" value)
                (recur))))
          (catch Throwable e
            e))))
    

    您现在将有两个通道,因此请使用 alts!alts!! 之类的内容来检查您的循环是否退出。完成后关闭频道。

    【讨论】:

      猜你喜欢
      • 2015-07-11
      • 1970-01-01
      • 1970-01-01
      • 2016-08-25
      • 1970-01-01
      • 2016-12-11
      • 1970-01-01
      • 2019-02-13
      • 1970-01-01
      相关资源
      最近更新 更多