【问题标题】:Clojure message handling / async, multithreadedClojure 消息处理/异步,多线程
【发布时间】:2012-10-04 00:01:30
【问题描述】:

我有一个小的 Clojure 消费者/发布者接收消息,处理它们并将它们发送给其他消费者,所有这些都通过 RabbitMQ。

我已经定义了一个消息处理程序,它在一个单独的线程(与主线程分开)中处理消息。 从下面的代码中可以看出,线程同步接收和发送消息,这一切都发生在 lcm/subscribe 函数启动的事件循环中。

那么,问题是,创建这些同步消息处理程序的 N 大小线程池的“Clojure 方式”是什么?我猜非 Clojure 的方式是通过 Java 互操作手动生成多个线程。

另外,考虑到处理不是非常占用 CPU 资源,这是否会加快消息处理速度?让这些消息处理程序异步会更好吗?再次考虑到发布时间比处理时间要多?

最后,我将如何衡量这些竞争方法的性能(我来自 Ruby/Javascript 世界,那里没有任何多线程)?

注意: 我知道这一切都可以通过水平扩展和产生更多监听消息总线的 JVM 进程来避免,但由于该应用程序将部署在 Heroku 上,我想在每个测功机/进程中使用尽可能多的资源.

(defn message-handler
  [ch metadata ^bytes payload]
  (let [msg (json/parse-string (String. payload "UTF-8"))
        processed-message (process msg)] 
    (lb/publish ch "e.events" "" processed-message)))

(defn -main
  [& args]
  (let [conn          (rmq/connect {:uri (System/getenv "MSGQ")})
        ch            (lch/open conn)
        q-name        "q.events.tagger"
        e-sub-name    "e.events.preproc"
        e-pub-name    "e.events"
        routing-key   "tasks.taggify"]
    (lq/declare ch q-name :exclusive false :auto-delete false)
    (le/declare ch e-pub-name "fanout" :durable false)
    (lq/bind ch q-name e-sub-name :routing-key routing-key)
    (.start (Thread. (fn []
                       (lcm/subscribe ch q-name message-handler :auto-ack true))))))

更基本的说明...我将如何重构此代码以支持使用附加参数注册消息处理程序回调,如下所示:

    (.start (Thread. (fn []
                       (lcm/subscribe ch q-name (message-handler pub-name) :auto-ack true))))))

然后发布参考:

    (lb/publish ch pub-name "" processed-message)))

而不是文字:

    (lb/publish ch "e.events" "" processed-message)))

【问题讨论】:

    标签: multithreading asynchronous clojure io idioms


    【解决方案1】:

    对于问题的第二部分,你可以使用如下所示的部分应用:

    (defn message-handler
      [pub-name ch metadata ^bytes payload]
      (let [msg (json/parse-string (String. payload "UTF-8"))
            processed-message (process msg)] 
        (lb/publish ch pub-name "" processed-message)))
    
    
    
    (.start 
      (Thread. 
         (fn []
           (lcm/subscribe ch q-name (partial message-handler e-pub-name) :auto-ack true))))))
    

    【讨论】:

      【解决方案2】:

      这是一个非常大的话题,您可以考虑将这个问题分解为几个不同的问题,但简洁的答案是:use agents

      【讨论】: