【问题标题】:How do you block a thread until a condition becomes true?在条件变为真之前如何阻塞线程?
【发布时间】:2014-04-16 03:41:49
【问题描述】:

在 Clojure 中,如何阻塞线程(未来)直到条件变为真?或者,或者,也许继续重试直到条件变为真?当您有条件变量时,这很容易,但我不确定 Clojure 的方法是什么。

更具体地说,我有一个共享变量,许多期货可以同时访问它。未来应该做到以下几点:

  1. 检查变量的状态。
  2. 如果状态满足特定条件,则将其更新为新状态。
  3. 如果状态不满足条件,future 应该阻塞或重试,直到满足条件(由另一个线程修改状态)。

【问题讨论】:

  • 我碰巧查看了#clojure 日志,看来您实际上对共享资源由使用它的线程独占,然后返回到“可用”的场景感兴趣" 一旦当前线程不再需要它,下一个线程将拾取它的状态。您可以使用 core.async 使用具有大小为 1 的缓冲区的通道来执行此操作: (1) 创建通道 -- (chan 1),将其放在适当的位置 -- 并将资源放在通道上; (2) 使用(<!! the-channel) 请求对资源的独占保留(阻塞); (3) 完成后,用>!!返回资源。

标签: concurrency clojure


【解决方案1】:

Java 平台支持条件变量,请参阅java.util.concurrent.locks.Condition 的文档。

上面页面中的示例很容易翻译成 Clojure:

;;; based on the example in java.util.concurrent.locks.Condition
;;; documentation for JDK 1.7, see the link above

(defprotocol PBoundedBuffer
  (-put [buf x])
  (-take [buf]))

(import (java.util.concurrent.locks ReentrantLock Condition))

(deftype BoundedBuffer [^ReentrantLock lock
                        ^Condition not-full?
                        ^Condition not-empty?
                        ^objects items
                        ^:unsynchronized-mutable ^int putptr
                        ^:unsynchronized-mutable ^int takeptr
                        ^:unsynchronized-mutable ^int cnt]
  PBoundedBuffer
  (-put [buf x]
    (.lock lock)
    (try
      (while (== cnt (alength items))
        (.await not-full?))
      (aset items putptr x)
      (set! putptr (unchecked-inc-int putptr))
      (if (== putptr (alength items))
        (set! putptr (int 0)))
      (set! cnt (unchecked-inc-int cnt))
      (.signal not-empty?)
      (finally
        (.unlock lock))))

  (-take [buf]
    (.lock lock)
    (try
      (while (zero? cnt)
        (.await not-empty?))
      (let [x (aget items takeptr)]
        (set! takeptr (unchecked-inc-int takeptr))
        (if (== takeptr (alength items))
          (set! takeptr (int 0)))
        (set! cnt (unchecked-dec-int cnt))
        (.signal not-full?)
        x)
      (finally
        (.unlock lock)))))

(defn bounded-buffer [capacity]
  (let [lock (java.util.concurrent.locks.ReentrantLock.)]
    (BoundedBuffer. lock
                    (.newCondition lock)
                    (.newCondition lock)
                    (object-array capacity)
                    0
                    0
                    0)))

REPL 的试驾:

(def bb (bounded-buffer 3))

(-put bb 1)
(-put bb 2)
(-put bb 3)

(future (-put bb 4) (println :foo))

(-take bb)

根据需要,future 阻塞,然后在最终调用 -take 后打印 :foo

【讨论】:

    【解决方案2】:

    Clojure 为此类问题提供了refsagentsatoms,听起来如果您的问题没有任何副作用,您可以使用 refs。

    【讨论】:

      【解决方案3】:

      您可以将您要检查的条件或值存储在代理中,并添加一个手表,一旦条件变为true 或该值是您想要的值,就会执行所需的操作:

      (def x (agent 0))
      
      (defn do-this-once-x-is-10 [key agnt old-val new-val]
        (when (= new-val 10)
          (println "x is now 10")))
      
      (add-watch x :print-alert do-this-once-x-is-10)
      
      (dotimes [_ 10]
        (Thread/sleep 1000)
        (send x inc))
      
      ; the value stored in x is incremented every second; after 10 seconds,
      ; the value of x will equal 10 and "x is now 10" will print
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2015-07-19
        • 2017-10-06
        • 2022-01-08
        • 2017-07-02
        • 1970-01-01
        • 2011-08-25
        相关资源
        最近更新 更多