【问题标题】:Thread synchronization with Clojure与 Clojure 的线程同步
【发布时间】:2023-04-01 19:53:01
【问题描述】:

我有一个练习:

  • 按顺序打印从 1 到 100 的所有正整数。

  • 使用块、信号量或其他类似机制(但避免休眠)协调两个线程,以便两个线程的组合输出按数字顺序显示。

     Sample Output
        In thread one: The number is ‘1’
        In thread two: The number is ‘2’
        In thread one: The number is ‘3’
        In thread two: The number is ‘4’
    

该练习适用于 Ruby,但我想向我的班级展示 Clojure 可能是该任务的不错选择。

我对任何语言的线程都没有任何经验,但我想使用类似的东西:

 (def thread_1 (future (swap! my-atom inc) ))
 (def thread_2 (future (swap! my-atom inc) ))

@thread_1 总是返回相同的值。有没有办法在 Clojure 中协调两个线程?

我在 Java 中使用 ReentrantLock 和 Condition 找到了这个 example,现在我正在尝试将它翻译成 Clojure。

【问题讨论】:

  • 顺序重要吗?我的意思是线程应该一个接一个地工作,就像你的例子一样。
  • @fl00r,是的,顺序必须是连续的。

标签: multithreading clojure clojure-java-interop


【解决方案1】:

如果线程的顺序很重要,并且如果您对非经典线程通信感到好奇,您可以选择clojure.core.async 并使用“通道”。

(require '[clojure.core.async :as a])

(let [chan-one (a/chan 1)
      chan-two (a/chan 1)]
  (a/>!! chan-one 1)
  (doseq [[thread in out] [["one" chan-one chan-two]
                           ["two" chan-two chan-one]]]
    (a/go-loop []
      (when-let [n (a/<! in)]
        (if (> n 10)
          (do (a/close! in)
              (a/close! out))
          (do (prn (format "In thread %s: The number is `%s`" thread n))
              (a/>! out (inc n))
              (recur)))))))

输出是

"In thread one: The number is `1`"
"In thread two: The number is `2`"
"In thread one: The number is `3`"
"In thread two: The number is `4`"
"In thread one: The number is `5`"
"In thread two: The number is `6`"
"In thread one: The number is `7`"
"In thread two: The number is `8`"
"In thread one: The number is `9`"
"In thread two: The number is `10`"

这里有个问题是go-routines在线程池中执行,所以它们不是专用线程,如果你想使用真正的线程,你应该这样:

(require '[clojure.core.async :as a])

(let [chan-one (a/chan 1)
      chan-two (a/chan 1)]
  (a/>!! chan-one 1)
  (doseq [[thread in out] [["one" chan-one chan-two]
                           ["two" chan-two chan-one]]]
    (a/thread
      (loop []
        (when-let [n (a/<!! in)]
          (if (> n 10)
            (do (a/close! in)
                (a/close! out))
            (do (prn (format "In thread %s: The number is `%s`" thread n))
                (a/>!! out (inc n))
                (recur))))))))

【讨论】:

    【解决方案2】:

    首先,您看到奇怪结果的原因是您取消引用 future,而不是持有该数字的原子。未来将在取消引用时返回 swap! 的结果。

    其次,可以使用locking(基本上是Java的synchronized),一次只允许增加一个线程并打印:

    (def n-atom (atom 0))
    
    (defn action []
      ; Arbitrarily chose the atom to lock on.
      ; It would probably be better to create a private object that can't be otherwise used.
      (locking n-atom
        (println
          (swap! n-atom inc))))
    
    (defn go []
      ; Have each thread do action twice for a total of four times
      (future (doall (repeatedly 2 action)))
      (future (doall (repeatedly 2 action))))
    
    (go)
    1
    2
    3
    4
    

    我会注意到 future 真的不应该在这里使用。 future 适用于您想要异步计算结果的情况。它会吞噬错误,直到它被取消引用,所以如果您从未@,您将永远不会看到future 中出现的异常。最好使用线程池,或者为了方便使用,借助宏自己启动两个线程:

    (defmacro thread
      "Starts the body in a new thread."
      [& body]
      `(doto (Thread. ^Runnable (fn [] ~@body))
             (.start)))
    
    (defn go []
      (thread (doall (repeatedly 2 action)))
      (thread (doall (repeatedly 2 action))))
    

    【讨论】:

      【解决方案3】:

      这是一种通常使用agent 来协调在同一状态下运行的多个线程的方法:

      (def my-agent (agent 1 :validator #(<= % 100)))
      (future
        (while true
          (send my-agent
                (fn [i]
                  (println "In thread" (.getName (Thread/currentThread))
                           "the number is:" i)
                  (inc i)))))
      In thread clojure-agent-send-pool-4 the number is: 1
      In thread clojure-agent-send-pool-5 the number is: 2
      In thread clojure-agent-send-pool-5 the number is: 3
      In thread clojure-agent-send-pool-4 the number is: 4
      In thread clojure-agent-send-pool-4 the number is: 5
      In thread clojure-agent-send-pool-4 the number is: 6
      

      无论有没有future,您都会在此处看到相同的基本行为,因为send 会立即在内部循环中返回,并且sent 函数可能会在池中的不同线程上执行.代理负责协调对共享状态的访问。

      更新:这是另一种不涉及:validator 函数或异常终止的方法:

      (def my-agent (agent (range 1 101)))
      (while (seq @my-agent)
        (send
          my-agent
          (fn [[n & ns]]
            (when n
              (println "In thread" (.getName (Thread/currentThread))
                       "the number is:" n)
              ns))))
      

      【讨论】:

      • 这里的信息没有错,但是……使用validator进行普通终止?让未来因普通终止而崩溃?嗯,这个例子可以改进。
      • @glts 公平点!查看更新的答案,如果您能想到其他改进,请告诉我。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2013-03-18
      • 2015-09-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多