【问题标题】:Blocking Queue - Need more information阻塞队列 - 需要更多信息
【发布时间】:2010-10-27 22:33:35
【问题描述】:

这个问题与我之前的一个问题有关..

Previous Post

在那里,阻塞性被认为是一个优势。

我试图开发一些简单的代码来展示阻塞特性,但我卡住了。我只是尝试制作大小为 4 的 BlockingQueue 并尝试添加 5 个元素并最终得到 java.lang.IllegalStateException。有人可以给我看一个阻止BlockingQueue性质的代码示例吗?


public static void main(String[] args) {
    BlockingQueue<String> bq = new LinkedBlockingQueue<String>(4);

    try {
        bq.offer("A");
        bq.offer("B");
        bq.offer("C");
        bq.offer("D");
        bq.offer("E");

        System.out.println("1 = " + bq.take());
        System.out.println("2 = " + bq.take());
        System.out.println("3 = " + bq.take());
        System.out.println("4 = " + bq.take());
        System.out.println("5 = " + bq.take());
        System.out.println("6 = " + bq.take());
    } catch (Exception e) {
        // TODO: handle exception
        e.printStackTrace();
    }
}

我使用了这个代码段。在这种情况下,我试图将 5 个元素放入大小为 4 的队列中。在这种情况下,应将 4 个元素(A、B、C、D)添加到队列中。然后我在打印时调用take() 方法。当我调用 System.out.println("1 = " + bq.take()); 时,不应该将“E”自动插入队列吗?因为它有一个免费的插槽?

【问题讨论】:

    标签: java data-structures concurrency queue


    【解决方案1】:

    您添加的是addoffer 还是put?我假设您使用的是add,因为它是唯一可以抛出IllegalStateException 的人;但是如果你阅读table,你会发现如果你想要阻塞语义,你应该使用put(和take来删除)。

    编辑:您的示例存在一些问题。

    首先我要回答“为什么我第一次调用take() 时没有插入E?”这个问题?答案是,当您调用take() 时,您已经尝试插入E 失败。一旦空间被释放,就没有什么可插入的了。

    现在,如果您将 offer() 更改为 put()put("E") 将永远不会返回。为什么?因为它正在等待其他线程从队列中移除一个元素。请记住,BlockingQueues 是为多线程访问而设计的。如果您有一个单线程应用程序,阻塞是无用的(实际上比无用更糟糕)。

    这是一个改进的例子:

    public static void main(String[] args) {
        final BlockingQueue<String> bq = new LinkedBlockingQueue<String>(4);
    
        Runnable producer = new Runnable() {
            public void run() {
                try {
                    bq.put("A");
                    bq.put("B");
                    bq.put("C");
                    bq.put("D");
                    bq.put("E");
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt(); 
                }
            }
        };
        Runnable consumer = new Runnable() {
            public void run() {
                try {
                    System.out.println("1 = " + bq.take());
                    System.out.println("2 = " + bq.take());
                    System.out.println("3 = " + bq.take());
                    System.out.println("4 = " + bq.take());
                    System.out.println("5 = " + bq.take());
                    System.out.println("6 = " + bq.take());
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                }
            }
        };
        new Thread(producer).start();
        new Thread(consumer).start();
    }
    

    现在put("E") 调用实际上会成功,因为它现在可以等到消费者线程从队列中删除“A”。最后一个take() 仍然会无限阻塞,因为没有第六个元素要移除。

    【讨论】:

    • 以前我使用添加 + 投票。但目前我正在尝试 put + Take
    • 我应该如何指定 PUT() 的超时时间?
    • Put 没有超时;如果您需要超时,请使用 offer。
    【解决方案2】:

    mmyers 打败了我:P (+1)
    这应该是你需要的,祝你好运!

    注意: put() 在您的示例中将失败,因为 put() 将阻塞直到空间可用。由于空间永远不可用,程序永远不会继续执行。

    ==== 旧答案======

    BlockingQueue 是一个接口,您必须使用其中一个实现类。

    “阻塞性质”只是说明您可以从队列中请求某些内容,如果它为空,则它所在的线程将阻塞(等待),直到将某些内容添加到队列中并且然后继续处理。

    ArrayBlockingQueue
    DelayQueue
    LinkedBlockingQueue
    PriorityBlockingQueue
    SynchronousQueue

    http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html

    //your main collection
    LinkedBlockingQueue<Integer> lbq = new LinkedBlockingQueue<Integer>();
    
    //Add your values
    lbq.put(100);
    lbq.put(200);
    
    //take() will actually remove the first value from the collection, 
    //or block if no value exists yet.
    //you will either have to interrupt the blocking, 
    //or insert something into the queue for the program execution to continue
    
    int currVal = 0;
    try {
        currVal = lbq.take();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    

    【讨论】:

    • 谢谢..我使用了“LinkedBlockingQueue”
    • 太棒了,这就是你需要的一切吗?如果您仍然希望我发布它,我现在正在制作一个示例
    • 最好把它贴出来。只是一个简短的例子:)
    • 好吧,这是一个非常简单的语法示例。如果您需要任何其他帮助,请告诉我。祝你好运!
    • 这行不通。如果要阻止则需要使用put,如果无法添加元素则希望返回,则需要使用offer。
    【解决方案3】:

    为了具体回答您的问题:Offer 是一个非阻塞的要约调用,因此在像您发布的那样的单线程方法中,对 offer('E') 的调用仅返回 false 而不修改整个队列。如果您使用阻塞 put('E') 调用,它将休眠直到空间可用。永远在你的简单例子中。您需要有一个单独的线程从队列中读取数据,以便为 put 完成创建空间。

    【讨论】:

    • 用 put 替换 offer 但失败。我需要为 PUT() 项目使用单独的线程吗?
    • 在我的回答中,我说:您需要有一个单独的线程从队列中读取,以便为 put 完成创建空间。
    【解决方案4】:

    你好更多关于这个课程的信息

     /**
     * Inserts the specified element into this queue if it is possible to do
     * so immediately without violating capacity restrictions, returning
     * {@code true} upon success and throwing an
     * {@code IllegalStateException} if no space is currently available.
     * When using a capacity-restricted queue, it is generally preferable to
     * use {@link #offer(Object) offer}.
     *
     * @param e the element to add
     * @return {@code true} (as specified by {@link Collection#add})
     * @throws IllegalStateException if the element cannot be added at this
     *         time due to capacity restrictions
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    boolean add(E e);
    
    /**
     * Inserts the specified element into this queue if it is possible to do
     * so immediately without violating capacity restrictions, returning
     * {@code true} upon success and {@code false} if no space is currently
     * available.  When using a capacity-restricted queue, this method is
     * generally preferable to {@link #add}, which can fail to insert an
     * element only by throwing an exception.
     *
     * @param e the element to add
     * @return {@code true} if the element was added to this queue, else
     *         {@code false}
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    boolean offer(E e);
    
    /**
     * Inserts the specified element into this queue, waiting if necessary
     * for space to become available.
     *
     * @param e the element to add
     * @throws InterruptedException if interrupted while waiting
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    void put(E e) throws InterruptedException;
    
    /**
     * Inserts the specified element into this queue, waiting up to the
     * specified wait time if necessary for space to become available.
     *
     * @param e the element to add
     * @param timeout how long to wait before giving up, in units of
     *        {@code unit}
     * @param unit a {@code TimeUnit} determining how to interpret the
     *        {@code timeout} parameter
     * @return {@code true} if successful, or {@code false} if
     *         the specified waiting time elapses before space is available
     * @throws InterruptedException if interrupted while waiting
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;
    
    /**
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element becomes available.
     *
     * @return the head of this queue
     * @throws InterruptedException if interrupted while waiting
     */
    E take() throws InterruptedException;
    
    /**
     * Retrieves and removes the head of this queue, waiting up to the
     * specified wait time if necessary for an element to become available.
     *
     * @param timeout how long to wait before giving up, in units of
     *        {@code unit}
     * @param unit a {@code TimeUnit} determining how to interpret the
     *        {@code timeout} parameter
     * @return the head of this queue, or {@code null} if the
     *         specified waiting time elapses before an element is available
     * @throws InterruptedException if interrupted while waiting
     */
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2014-10-16
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-02-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多