【问题标题】:Producer Consumer in Java using threads never terminatesJava中使用线程的生产者消费者永远不会终止
【发布时间】:2016-06-03 20:47:30
【问题描述】:

我有一个生产者-消费者问题要在 Java 中实现,我希望生产者线程运行特定的时间,例如1 天,将对象放入 BlockingQueue - 特别是推文,通过 Twitter4j 从 Twitter Streaming API 流式传输 - 以及消费者线程从队列中消费这些对象并将它们写入文件。我使用了来自Read the 30Million user id's one by one from the big file 的PC 逻辑,其中生产者是FileTask,消费者是CPUTask(检查第一个答案;我的方法使用相同的迭代/try-catch blocks)。当然,我相应地调整了实现。 我的主要功能是:

public static void main(String[] args) {
    ....

    final int threadCount = 2;

    // BlockingQueue with a capacity of 200
    BlockingQueue<Tweet> tweets = new ArrayBlockingQueue<>(200);

    // create thread pool with given size
    ExecutorService service = Executors.newFixedThreadPool(threadCount);

    Future<?> f = service.submit(new GathererTask(tweets));
    try {
        f.get(1,TimeUnit.MINUTES); // Give specific time to the GathererTask
    } catch (InterruptedException | ExecutionException | TimeoutException e) {
        f.cancel(true); // Stop the Gatherer
    }

    try {
        service.submit(new FileTask(tweets)).get(); // Wait til FileTask completes
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }

    service.shutdownNow();

   try {
        service.awaitTermination(7, TimeUnit.DAYS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

现在,问题在于,尽管它确实流式传输推文并将它们写入文件,但它永远不会终止并且永远不会到达f.cancel(true) 部分。我应该改变什么才能使其正常工作?另外,您能否在回答中解释线程逻辑出了什么问题,所以我从错误中吸取教训?提前谢谢你。

这些是我的 PC 类的 run() 函数:

制作人:

@Override
    public void run() {
        StatusListener listener = new StatusListener(){
            public void onStatus(Status status) {
                try {
                    tweets.put(new Tweet(status.getText(),status.getCreatedAt(),status.getUser().getName(),status.getHashtagEntities()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    Thread.currentTread.interrupt(); // Also tried this command
            }
        }

        public void onException(Exception ex) {
            ex.printStackTrace();
        }
    };
    twitterStream.addListener(listener);
    ... // More Twitter4j commands
}

消费者:

public void run() {
    Tweet tweet;
    try(PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter("out.csv", true)))) {
        while(true) {
            try {
                // block if the queue is empty
                tweet = tweets.take();
                writeTweetToFile(tweet,out);

            } catch (InterruptedException ex) {
                break; // GathererTask has completed
            }
        }
        // poll() returns null if the queue is empty
        while((tweet = tweets.poll()) != null) {
            writeTweetToFile(tweet,out);
        }

    } catch (IOException e) {
        e.printStackTrace();
    }
}

【问题讨论】:

    标签: java multithreading twitter infinite-loop threadpoolexecutor


    【解决方案1】:

    您应该检查您的 Thread 类是否正在处理 InterruptedException,如果没有,它们将永远等待。 This 可能会有所帮助。

    【讨论】:

    • 我编辑了我的问题,所以现在它还包括 PC 运行方法。
    • 这些对我来说看起来不错,我认为您需要使用 jstack 进行线程转储并查看线程挂起的位置。您可以找到快速说明middlewaremagic.com/weblogic/?p=2281
    猜你喜欢
    • 2018-07-11
    • 2012-04-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-11-17
    • 1970-01-01
    • 2013-11-10
    相关资源
    最近更新 更多