【问题标题】:How to interrupt a BlockingQueue which is blocking on take()?如何中断在 take() 上阻塞的 BlockingQueue?
【发布时间】:2010-10-23 04:15:52
【问题描述】:

我有一个类,它从BlockingQueue 获取对象并通过在连续循环中调用take() 来处理它们。在某个时候,我知道不会有更多的对象被添加到队列中。如何中断take() 方法使其停止阻塞?

这是处理对象的类:

public class MyObjHandler implements Runnable {

  private final BlockingQueue<MyObj> queue;

  public class MyObjHandler(BlockingQueue queue) {
    this.queue = queue;
  }

  public void run() {
    try {
      while (true) {
        MyObj obj = queue.take();
        // process obj here
        // ...
      }
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }
}

下面是使用这个类来处理对象的方法:

public void testHandler() {

  BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);  

  MyObjectHandler  handler = new MyObjectHandler(queue);
  new Thread(handler).start();

  // get objects for handler to process
  for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
    queue.put(i.next());
  }

  // what code should go here to tell the handler
  // to stop waiting for more objects?
}

【问题讨论】:

    标签: java concurrency blockingqueue


    【解决方案1】:

    如果中断线程不是一种选择,另一种方法是在队列上放置一个“标记”或“命令”对象,MyObjHandler 将识别为这样并跳出循环。

    【讨论】:

    • 这也称为“毒丸关闭”方法,在“Java 并发实践”中进行了详细讨论,特别是第 155-156 页。
    【解决方案2】:
    BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
    MyObjectHandler handler = new MyObjectHandler(queue);
    Thread thread = new Thread(handler);
    thread.start();
    for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
      queue.put(i.next());
    }
    thread.interrupt();
    

    但是,如果您这样做,线程可能会在队列中仍有项目等待处理时被中断。您可能需要考虑使用poll 而不是take,这将允许处理线程超时并在它等待一段时间没有新输入时终止。

    【讨论】:

    • 是的,如果线程在队列中仍有项目时被中断,则会出现问题。为了解决这个问题,我添加了代码以确保在中断线程之前队列为空:while (queue.size()&gt;0) Thread.currentThread().sleep(5000);跨度>
    • @MCS - 请注意,未来的访问者您的方法是一种 hack,不应在生产代码中复制,原因有以下三个。总是首选找到一种实际的方法来挂钩关机。使用Thread.sleep() 作为正确钩子的替代品是绝对不能接受的。在其他实现中,其他线程可能会将事物放入队列中,而 while 循环可能永远不会结束。
    • 谢天谢地,没有必要依赖这样的黑客攻击,因为如果需要,在中断之后处理它也很容易。例如,一个“详尽的”take() 实现可能看起来像:try { return take(); } catch (InterruptedException e) { E o = poll(); if (o == null) throw e; Thread.currentThread().interrupt(); return o; } 但是,没有理由需要在这一层实现它,并且稍微更高一点的实现会导致更高效的代码(例如通过避免每个元素 InterruptedException 和/或使用 BlockingQueue.drainTo())。
    【解决方案3】:

    很晚了,但希望这对其他人也有帮助我遇到了类似的问题,并使用了erickson above 建议的poll 方法,并进行了一些小改动,

    class MyObjHandler implements Runnable 
    {
        private final BlockingQueue<MyObj> queue;
        public volatile boolean Finished;  //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL
        public MyObjHandler(BlockingQueue queue) 
        {
            this.queue = queue;
            Finished = false;
        }
        @Override
        public void run() 
        {        
            while (true) 
            {
                try 
                {
                    MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS);
                    if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return
                    {
                        // process obj here
                        // ...
                    }
                    if(Finished && queue.isEmpty())
                        return;
    
                } 
                catch (InterruptedException e) 
                {                   
                    return;
                }
            }
        }
    }
    
    public void testHandler() 
    {
        BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); 
    
        MyObjHandler  handler = new MyObjHandler(queue);
        new Thread(handler).start();
    
        // get objects for handler to process
        for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); )
        {
            queue.put(i.next());
        }
    
        // what code should go here to tell the handler to stop waiting for more objects?
        handler.Finished = true; //THIS TELLS HIM
        //If you need you can wait for the termination otherwise remove join
        myThread.join();
    }
    

    这两个问题都解决了

    1. 标记了BlockingQueue,这样它就知道不必再等待元素了
    2. 中间没有中断,因此仅当队列中的所有项目都已处理且没有剩余要添加的项目时,处理块才会终止

    【讨论】:

    • 使Finished 变量volatile 保证线程之间的可见性。见stackoverflow.com/a/106787
    • 如果我没记错的话,队列中的最后一个元素不会被处理。当您从队列中取出最后一个元素时,finished 为真且队列为空,因此它将在处理完最后一个元素之前返回。添加第三个条件 if (Finished && queue.isEmpty() && obj == null)
    • @MattR 谢谢,正确建议我将编辑发布的答案
    【解决方案4】:

    中断线程:

    thread.interrupt()
    

    【讨论】:

      【解决方案5】:

      还是别打断了,太恶心了。

          public class MyQueue<T> extends ArrayBlockingQueue<T> {
      
              private static final long serialVersionUID = 1L;
              private boolean done = false;
      
              public ParserQueue(int capacity) {  super(capacity); }
      
              public void done() { done = true; }
      
              public boolean isDone() { return done; }
      
              /**
               * May return null if producer ends the production after consumer 
               * has entered the element-await state.
               */
              public T take() throws InterruptedException {
                  T el;
                  while ((el = super.poll()) == null && !done) {
                      synchronized (this) {
                          wait();
                      }
                  }
      
                  return el;
              }
          }
      
      1. 生产者将对象放入队列时,调用queue.notify(),如果结束,调用queue.done()
      2. 循环 while (!queue.isDone() || !queue.isEmpty())
      3. test take() 返回 null 值

      【讨论】:

      • 我会说以前的解决方案比这个更干净,更简单
      • 完成标志与毒丸相同,只是给药方式不同:)
      • 清洁剂?我对此表示怀疑。您不知道线程何时准确中断。或者让我问,什么是更清洁的?它的代码更少,这是事实。
      猜你喜欢
      • 1970-01-01
      • 2014-02-08
      • 1970-01-01
      • 1970-01-01
      • 2012-01-05
      • 1970-01-01
      • 2017-04-16
      • 2014-06-16
      • 2013-03-25
      相关资源
      最近更新 更多