【问题标题】:Passing a work item between threads (Java)在线程之间传递工作项 (Java)
【发布时间】:2009-02-06 16:00:44
【问题描述】:

我有两个线程。生产者正在生产数据片段(字符串对象),消费者在其中处理这些字符串。问题是我的应用程序只需要处理最新的数据对象。换句话说,如果生产者设法产生两个字符串“s1”,然后是“s2”,那么我希望消费者只处理“s2”。 “s1”可以安全地丢弃。

当然,实现实现这种行为的类没有问题,但我想使用 java.util.concurrent 中的标准机制(如果存在这样的机制)。请注意,SynchronousQueue 不是一个好的解决方案:消费者在排队“s1”时会阻塞,并且没有机会产生“s2”。

(总之,我在找一个带有阻塞删除操作和非阻塞集合操作的单元素集合)

有什么想法吗?

【问题讨论】:

    标签: java multithreading


    【解决方案1】:

    我认为您的最佳答案可能是使用 ArrayBlockingQueue,其中生产者(您只有一个生产者,对吗?)在添加新元素之前删除任何现有元素。

    当然,这个实现中存在竞争条件:消费者可以在生产者删除元素之前开始处理它。但无论您使用何种数据结构,这些竞争条件将始终存在。

    【讨论】:

    • 真的有竞态条件吗?我认为,ArrayBlockingQueue 中的 ReentrantLock 是为了避免它们。
    • @Errandir - 我所指的竞争条件是队列中有一个元素在等待,而消费者在生产者添加新元素之前获取该元素。由于没有更好的术语,这是在您使用的任何数据结构之外发生的“宏观竞赛”。
    【解决方案2】:

    Exchanger 类呢?这是线程之间交换对象的标准方式。用你的类专门化它,可能是一个字符串列表。让消费者只使用第一个/最后一个。

    【讨论】:

    • 可能比我的方法更好,尽管您确实需要将生产者的超时设置为 0
    • Exchanger 也阻止了 producer :-(
    • 除非您将超时设置为 0(或负数)
    【解决方案3】:

    您可以为此使用大小为 1 的数组:

    String[] oeq = new String[1];
    

    示例来源:

    public class Test {
        private static final String[] oeq = new String[1];
        public static void main(String[] args) {
            (new Producer()).start();
            (new Consumer()).start();
            (new Consumer()).start();
            (new Consumer()).start();
            (new Consumer()).start();
            (new Consumer()).start();
            (new Consumer()).start();
        }
    
        private static class Producer extends Thread {
            public void run() {
                int i=0;
                while(true) {
                    i++;
                    synchronized(oeq) {
                        oeq[0] = ""+i;
                        oeq.notifyAll();
                    }
                }
            }
        }
    
        private static class Consumer extends Thread {
            public void run() {
                String workload = null;
                while(true) {
                    synchronized(oeq) {
                        try {
                            oeq.wait();
                        } catch(InterruptedException ie) {
                            ie.printStackTrace();
                        }
                        if(oeq[0] != null) {
                            workload = oeq[0];
                            oeq[0] = null;
                        }
                    }
                    if(workload != null) {
                        System.out.println(workload);
                    }
                }
            }
        }
    }
    

    【讨论】:

    • 这将非常低效。消费者线程在等待工作时不应阻塞 CPU。
    • 其实这里的consumer会消耗掉所有的CPU,因为循环中没有wait()(这可能就是你所说的“阻塞”)。这就是为什么,我认为,OP 想要使用现有的 JDK 类——编写一个损坏的本地并发对象很容易。
    • 好的,到了本地并发对象被破坏的地步:你为什么要扩展线程?
    • 实现 Runnable 可能会更好。这是一个快速示例
    • 或者你还有什么意思?
    【解决方案4】:

    好吧,如果您只想要最近生成的字符串,那么您根本不需要队列 - 您只需要一个字符串引用:生产者设置它,消费者读取它。如果消费者花了很长时间阅读它以至于生产者重新设置它......那又怎样?

    设置和读取引用是原子的。唯一的问题是,如果您希望以某种方式通知消费者有可用的字符串。但即便如此......如果消费者正在做一些需要一段时间的事情,那么你真的不需要并发库中的任何花哨的东西。

    注意,顺便说一句,此示例适用于任意数量的生产者和/或消费者线程。

    import java.util.Random;
    
    public class Example {
        public static void main(String[] av) {
            new Example().go();
        }
    
        Object  mutex       = new Object();
        String  theString   = null;
    
        void go() {
            Runnable producer = new Runnable() {
                public void run() {
                    Random rnd = new Random();
                    try {
                        for (;;) {
                            Thread.sleep(rnd.nextInt(10000));
                            synchronized (mutex) {
                                theString = "" + System.currentTimeMillis();
                                System.out.println("Producer: Setting string to " + theString);
                                mutex.notify();
                            }
                        }
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
    
                }
            };
    
            Runnable consumer = new Runnable() {
                public void run() {
                    try {
                        String mostRecentValue = null;
                        Random rnd = new Random();
                        for (;;) {
                            synchronized (mutex) {
                                // we use == because the producer
                                // creates new string
                                // instances
                                if (theString == mostRecentValue) {
                                    System.out.println("Consumer: Waiting for new value");
                                    mutex.wait();
                                    System.out.println("Consumer: Producer woke me up!");
                                } else {
                                    System.out.println("Consumer: There's a new value waiting for me");
                                }
                                mostRecentValue = theString;
                            }
                            System.out.println("Consumer: processing " + mostRecentValue);
                            Thread.sleep(rnd.nextInt(10000));
                        }
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            };
    
    
            new Thread(producer).start();
            new Thread(consumer).start();
        }
    }
    

    【讨论】:

    • 您需要将引用设为 volatile,并在 1.5+ JDK 中运行,以保证这一点(Java 内存模型允许线程无限期地维护自己的副本)
    猜你喜欢
    • 1970-01-01
    • 2014-10-28
    • 1970-01-01
    • 2016-05-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-08-12
    相关资源
    最近更新 更多