【问题标题】:Do the BlockingQueue methods always throw an InterruptedException when the thread is interrupted?当线程被中断时,BlockingQueue 方法是否总是抛出 InterruptedException?
【发布时间】:2012-02-13 00:24:10
【问题描述】:

在我的一个 Java 6 应用程序中,我有一个线程向主线程提供数据,同时还从数据库中预取更多记录。它使用ArrayBlockingQueue 队列作为 FIFO 缓冲区,其主循环大致如下:

while (!Thread.interrupted()) {
    if (source.hasNext()) {
        try {
            queue.put(source.next())
        } catch (InterruptedException e) {
            break;
        }
    } else {
        break;
    }
}

有一些代码会在循环终止后进行一些清理工作,例如毒化队列并释放任何资源,但这几乎就是全部。

就目前而言,从主线程馈线线程没有直接通信:馈线线程使用适当的选项设置,然后使用阻塞队列自行离开控制数据流。

当队列已满时主线程需要关闭feeder时出现问题。由于没有直接控制通道,shutdown方法使用Thread接口到interrupt()feeder线程。不幸的是,在大多数情况下,尽管被中断,但馈线线程仍被阻止在 put() 中 - 不会引发异常。

通过对interrupt() 文档和队列实现源代码的简要阅读,在我看来put() 经常阻塞而不使用JVM 的任何可中断设施。更具体地说,在我当前的 JVM(OpenJDK 1.6b22)上,它阻塞了 sun.misc.Unsafe.park() 本机方法。也许它使用自旋锁或其他东西,但无论如何,这似乎属于the following case

如果前面的条件都不成立,则设置该线程的中断状态。

设置了一个状态标志,但线程仍然在put() 中被阻塞并且不会进一步迭代以便可以检查该标志。结果?一个不会死的僵尸线程

  1. 我对这个问题的理解是否正确,还是我遗漏了什么?

  2. 解决此问题的可能方法是什么?目前我只能想到两种解决方案:

    一个。在队列中多次调用poll() 以解除对馈线线程的阻塞:就我所见,丑陋且不太可靠,但它大部分有效。

    b.使用带有超时的offer() 方法而不是put() 以允许线程在可接受的时间范围内检查其中断状态。

除非我遗漏了什么,否则这是对 Java 中 BlockingQueue 实现的一些未充分记录的警告。 似乎在文档时有一些迹象,例如建议毒化队列以关闭工作线程,但我找不到任何明确的参考。

编辑:

好的,上面的解决方案 (a) 有一个更,呃,剧烈的变化:ArrayBlockingQueue.clear()。我认为这应该总是有效的,即使它不完全是优雅的定义......

【问题讨论】:

    标签: java multithreading


    【解决方案1】:

    我认为您的问题有两个可能的原因。

    1. The Law of the Sabotaged Doorbell 中所述,您可能没有正确处理中断。在那里你会发现:

      调用可能引发InterruptedException的代码怎么办?不要立即拉出电池!这个问题通常有两个答案:

      从您的方法中重新抛出 InterruptedException。 这通常是最简单和最好的方法。它被新的 java.util.concurrent.* 包使用,这解释了为什么我们现在经常接触到这个异常。
      抓住它,设置中断状态,返回。如果你在调用可能导致异常的代码的循环中运行,您应该将状态设置回被中断。

      例如:

      while (!Thread.currentThread().isInterrupted()) {
          // do something
          try {
              TimeUnit.SECONDS.sleep(1000);
          } catch (InterruptedException e) {
              Thread.currentThread().interrupt();
              break;
          }
      }
      
    2. source.hasNext()source.next() 正在消耗和丢弃中断状态。请参阅下面的添加了解我如何解决此问题。

    我相信在ArrayBlockingqueue.put() 处中断线程一种有效的解决方案。

    已添加

    我使用CloseableBlockingQueue 解决了问题2,它可以从阅读器端关闭。这样一来,一旦关闭,所有put 调用都会快捷方式。然后您可以从作者那里检查队列的closed 标志。

    // A blocking queue I can close from the pull end. 
    // Please only use put because offer does not shortcut on close.
    // <editor-fold defaultstate="collapsed" desc="// Exactly what it says on the tin.">
    class CloseableBlockingQueue<E> extends ArrayBlockingQueue<E> {
      // Flag indicates closed state.
      private volatile boolean closed = false;
      // All blocked threads. Actually this is all threads that are in the process
      // of invoking a put but if put doesn't block then they disappear pretty fast.
      // NB: Container is O(1) for get and almost O(1) (depending on how busy it is) for put.
      private final Container<Thread> blocked;
    
      // Limited size.
      public CloseableBlockingQueue(int queueLength) {
        super(queueLength);
        blocked = new Container<Thread>(queueLength);
      }
    
      /**
       * *
       * Shortcut to do nothing if closed.
       *
       * Track blocked threads.
       */
      @Override
      public void put(E e) throws InterruptedException {
        if (!closed) {
          Thread t = Thread.currentThread();
          // Hold my node on the stack so removal can be trivial.
          Container.Node<Thread> n = blocked.add(t);
          try {
            super.put(e);
          } finally {
            // Not blocked anymore.
            blocked.remove(n, t);
          }
        }
      }
    
      /**
       *
       * Shortcut to do nothing if closed.
       */
      @Override
      public E poll() {
        E it = null;
        // Do nothing when closed.
        if (!closed) {
          it = super.poll();
        }
        return it;
      }
    
      /**
       *
       * Shortcut to do nothing if closed.
       */
      @Override
      public E poll(long l, TimeUnit tu) throws InterruptedException {
        E it = null;
        // Do nothing when closed.
        if (!closed) {
          it = super.poll(l, tu);
        }
        return it;
      }
    
      /**
       *
       * isClosed
       */
      boolean isClosed() {
        return closed;
      }
    
      /**
       *
       * Close down everything.
       */
      void close() {
        // Stop all new queue entries.
        closed = true;
        // Must unblock all blocked threads.
    
        // Walk all blocked threads and interrupt them.
        for (Thread t : blocked) {
          //log("! Interrupting " + t.toString());
          // Interrupt all of them.
          t.interrupt();
        }
      }
    
      @Override
      public String toString() {
        return blocked.toString();
      }
    }
    

    您还需要无锁的Container 和 O(1) put/get(尽管它不是严格意义上的集合)。它在幕后使用Ring

    public class Container<T> implements Iterable<T> {
    
      // The capacity of the container.
      final int capacity;
      // The list.
      AtomicReference<Node<T>> head = new AtomicReference<Node<T>>();
    
      // Constructor
      public Container(int capacity) {
        this.capacity = capacity;
        // Construct the list.
        Node<T> h = new Node<T>();
        Node<T> it = h;
        // One created, now add (capacity - 1) more
        for (int i = 0; i < capacity - 1; i++) {
          // Add it.
          it.next = new Node<T>();
          // Step on to it.
          it = it.next;
        }
        // Make it a ring.
        it.next = h;
        // Install it.
        head.set(h);
      }
    
      // Empty ... NOT thread safe.
      public void clear() {
        Node<T> it = head.get();
        for (int i = 0; i < capacity; i++) {
          // Trash the element
          it.element = null;
          // Mark it free.
          it.free.set(true);
          it = it.next;
        }
        // Clear stats.
        resetStats();
      }
    
      // Add a new one.
      public Node<T> add(T element) {
        // Get a free node and attach the element.
        return getFree().attach(element);
      }
    
      // Find the next free element and mark it not free.
      private Node<T> getFree() {
        Node<T> freeNode = head.get();
        int skipped = 0;
        // Stop when we hit the end of the list 
        // ... or we successfully transit a node from free to not-free.
        while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) {
          skipped += 1;
          freeNode = freeNode.next;
        }
        if (skipped < capacity) {
          // Put the head as next.
          // Doesn't matter if it fails. That would just mean someone else was doing the same.
          head.set(freeNode.next);
        } else {
          // We hit the end! No more free nodes.
          throw new IllegalStateException("Capacity exhausted.");
        }
        return freeNode;
      }
    
      // Mark it free.
      public void remove(Node<T> it, T element) {
        // Remove the element first.
        it.detach(element);
        // Mark it as free.
        if (!it.free.compareAndSet(false, true)) {
          throw new IllegalStateException("Freeing a freed node.");
        }
      }
    
      // The Node class. It is static so needs the <T> repeated.
      public static class Node<T> {
    
        // The element in the node.
        private T element;
        // Are we free?
        private AtomicBoolean free = new AtomicBoolean(true);
        // The next reference in whatever list I am in.
        private Node<T> next;
    
        // Construct a node of the list
        private Node() {
          // Start empty.
          element = null;
        }
    
        // Attach the element.
        public Node<T> attach(T element) {
          // Sanity check.
          if (this.element == null) {
            this.element = element;
          } else {
            throw new IllegalArgumentException("There is already an element attached.");
          }
          // Useful for chaining.
          return this;
        }
    
        // Detach the element.
        public Node<T> detach(T element) {
          // Sanity check.
          if (this.element == element) {
            this.element = null;
          } else {
            throw new IllegalArgumentException("Removal of wrong element.");
          }
          // Useful for chaining.
          return this;
        }
    
        @Override
        public String toString() {
          return element != null ? element.toString() : "null";
        }
      }
    
      // Provides an iterator across all items in the container.
      public Iterator<T> iterator() {
        return new UsedNodesIterator<T>(this);
      }
    
      // Iterates across used nodes.
      private static class UsedNodesIterator<T> implements Iterator<T> {
        // Where next to look for the next used node.
    
        Node<T> it;
        int limit = 0;
        T next = null;
    
        public UsedNodesIterator(Container<T> c) {
          // Snapshot the head node at this time.
          it = c.head.get();
          limit = c.capacity;
        }
    
        public boolean hasNext() {
          if (next == null) {
            // Scan to the next non-free node.
            while (limit > 0 && it.free.get() == true) {
              it = it.next;
              // Step down 1.
              limit -= 1;
            }
            if (limit != 0) {
              next = it.element;
            }
          }
          return next != null;
        }
    
        public T next() {
          T n = null;
          if ( hasNext () ) {
            // Give it to them.
            n = next;
            next = null;
            // Step forward.
            it = it.next;
            limit -= 1;
          } else {
            // Not there!!
            throw new NoSuchElementException ();
          }
          return n;
        }
    
        public void remove() {
          throw new UnsupportedOperationException("Not supported.");
        }
      }
    
      @Override
      public String toString() {
        StringBuilder s = new StringBuilder();
        Separator comma = new Separator(",");
        // Keep counts too.
        int usedCount = 0;
        int freeCount = 0;
        // I will iterate the list myself as I want to count free nodes too.
        Node<T> it = head.get();
        int count = 0;
        s.append("[");
        // Scan to the end.
        while (count < capacity) {
          // Is it in-use?
          if (it.free.get() == false) {
            // Grab its element.
            T e = it.element;
            // Is it null?
            if (e != null) {
              // Good element.
              s.append(comma.sep()).append(e.toString());
              // Count them.
              usedCount += 1;
            } else {
              // Probably became free while I was traversing.
              // Because the element is detached before the entry is marked free.
              freeCount += 1;
            }
          } else {
            // Free one.
            freeCount += 1;
          }
          // Next
          it = it.next;
          count += 1;
        }
        // Decorate with counts "]used+free".
        s.append("]").append(usedCount).append("+").append(freeCount);
        if (usedCount + freeCount != capacity) {
          // Perhaps something was added/freed while we were iterating.
          s.append("?");
        }
        return s.toString();
      }
    }
    

    【讨论】:

    • 我稍微放弃了这种可能性,因为 feeder 线程在put() 中等待的时间要多得多。然而,这实际上听起来很合理。 source 对象属于第三方数据库相关库 - 使用所有网络代码,必须在某处抛出 InterruptedException,但顶级方法不会抛出它们......叹息,我讨厌挖掘第三方代码...
    • 哦,为了大声哭泣......编写这个库的人让它吞噬了每一个抛出的 InterruptedException!每一个!这样的代码谁写的?
    • 顺便说一句,将线程状态设置回中断就这个特定线程而言没有任何改变 - 一旦循环中断了直接导致终止的路径。无论如何,我没有任何异常需要处理......
    • 我已经添加了我的CloseablBlockingQueue 代码。只要您在进纸线程中检查它的closed 状态,一切都应该很好。在close 上,如果线程在put 上被阻塞,它将获得中断。如果不是,它将默默地消耗所有puts,直到您注意到它已关闭。
    • 我会 +1 并接受这个答案,以指出我已经放弃的可能性以及代码形式的包罗万象的解决方案 :-) 我不完全相信第三方图书馆对我所看到的内容负全部责任,但我真的没有时间进一步调查。
    【解决方案2】:
    私有 AtomicBoolean 关闭 = new AtomicBoolean(); 无效关机() { 关机。设置(真); } 而(!shutdown.get()){ if (source.hasNext()) { 对象项 = source.next(); while (!shutdown.get() && !queue.offer(item, 100, TimeUnit.MILLISECONDS)) { 继续; } } 别的 { 休息; } }

    【讨论】:

    • 1.我的代码中的 Thread.interrupted() 调用应该无关紧要 - 当它返回时,线程已经在终止的路上了。
    • 2.接得好!在简化 StackOverflow 帖子的代码时,我错过了 break 声明。我已将其编辑回问题。
    • 3.我确实想在主线程终止之前很久就停止馈线线程。它不仅不再有用,而且会消耗大量资源——至少是一个数据库连接和一个阻止 GC 清除队列的引用。
    • 主线程是否可以在馈线线程中设置某种关闭标志?你说没有直接的沟通渠道,但能有吗?我建议仅通过中断来控制线程的生命周期是糟糕的设计,并且正如您所见,可能会受到您无法控制的第三方代码的干扰。
    • 这里不需要AtomicBoolean,因为它只设置为true。将其设为 volatile 就足够了。
    猜你喜欢
    • 2011-05-15
    • 2012-04-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多