【问题标题】:Simple asynchronous I/O: many threads, one file简单的异步 I/O:多线程,一个文件
【发布时间】:2011-10-02 00:22:09
【问题描述】:

我有一个科学应用程序,我通常与xargs 并行运行,但是这种方案会产生重复的 JVM 启动成本,并且忽略了缓存文件 I/O 和 JIT 编译器。我已经修改了代码以使用线程池,但我一直不知道如何保存我的输出。

程序(即新程序的一个线程)读取两个文件,进行一些处理,然后将结果打印到标准输出。目前,我通过让每个线程将其结果字符串添加到BlockingQueue 来处理输出。只要布尔标志为真,另一个线程从队列中取出并写入文件。然后我awaitTermination并将flag设置为false,触发文件关闭和程序退出。

我的解决方案似乎有点笨拙;实现这一目标的最简单和最好的方法是什么? 我应该如何将来自多个线程的主要结果数据写入单个文件?

如果答案是广泛适用的方法,则不需要特定于 Java。

更新

我正在使用“STOP”作为毒丸。

while (true) {
    String line = queue.take();
    if (line.equals("STOP")) {
        break;
    } else {
        output.write(line);
    }
}
output.close();

我手动启动队列消费线程,然后将作业添加到线程池中,等待作业完成,最后毒化队列并加入消费者线程。

【问题讨论】:

  • 如果使用这样的标志,如果是static boolean字段,一定要声明为volatile!否则它将不起作用,除非您将同步添加到该字段的读/写(您也可以使用static final AtomicBoolean)。 Anyawys,我会坚持使用 Will Hartung 所描述的“毒丸”模式。

标签: java multithreading file-io asynchronous parallel-processing


【解决方案1】:

这确实是您想要的方式,让线程将其输出放入队列,然后让编写器将其耗尽。

您可能想做的唯一一件事就是让事情变得更简洁,而不是检查标志,只需将“全部完成”标记放在队列中,作者可以使用它来知道它已经完成。这样就不需要带外信号了。

这很简单,您可以使用众所周知的字符串、枚举或简单的共享对象。

【讨论】:

    【解决方案2】:

    您可以使用ExecutorService。 提交将执行任务并在完成后返回字符串的Callable

    提交Callable 时,您将获得Future,存储这些引用,例如在一个列表中。

    然后简单地遍历Futures 并通过调用Future#get 获取字符串。 如果尚未完成,这将阻塞直到任务完成,否则立即返回值。

    例子:

    ExecutorService exec = Executors.newFixedThreadPool(10);
    List<Future<String>> tasks = new ArrayList<Future<String>>();
    tasks.add(exec.submit(new Callable<String> {
        public String call() {
           //do stuff
           return <yourString>;
        }
    }));
    
    //and so on for the other tasks
    
    for (Future<String> task : tasks) {
        String result = task.get();
        //write to output
    }
    

    【讨论】:

    • awaitTermination() “阻塞直到所有任务完成执行关闭请求后”所以在提交所有任务后应该需要调用shutdown()。但由于 Future.get() 也会阻塞,所以不需要调用 awaitTermination()。
    • 你是对的,关机和等待是没有必要的。谢谢!
    【解决方案3】:

    多线程处理,一个线程写入和它们之间的消息队列是一个很好的策略。需要解决的问题是知道所有工作何时完成。一种方法是计算你启动了多少工作线程,然后计算你得到了多少响应。类似这样的伪代码:

    int workers = 0
    for each work item {
       workers++
       start the item's worker in a separate thread
    }
    while workers > 0 {
       take worker's response from a queue
       write response to file
       workers--
    }
    

    如果工作人员在执行时可以找到更多工作项,这种方法也有效。只需在工作人员响应中包含任何其他尚未处理的工作,然后像往常一样增加工作人员计数并启动工作人员线程。

    如果每个worker只返回一条消息,你可以使用Java的ExecutorService来执行返回结果的Callable实例。 ExecutorService 的方法可以访问 Future 实例,当 Callable 完成工作时,您可以从中获取结果。

    因此,您首先将所有任务提交给 ExecutorService,然后遍历所有 Futures 并获取它们的响应。这样,您将按照检查期货的顺序编写响应,这可能与他们完成工作的顺序不同。如果延迟不重要,那应该不是问题。否则,消息队列(如上所述)可能更合适。

    【讨论】:

      【解决方案4】:

      不清楚您的输出文件是否有一些定义的顺序,或者您是否只是将数据转储到那里。我认为它没有顺序。

      我不明白为什么您需要一个额外的线程来写入输出。只需synchronized 写入文件并在每个线程结束时调用它的方法。

      【讨论】:

        【解决方案5】:

        如果您有多个线程写入同一个文件,最简单的做法是在任务中写入该文件。

        final PrintWriter out = 
        ExecutorService es =
        for(int i=0;i<tasks;i++)
            es.submit(new Runnable() {
                public void run() {
                    performCalculations();
                    // so only one thread can write to the file at a time.
                    synchornized(out) {
                        writeResults(out);
                    }
                }
            });
         es.shutdown();
         es.awaitTermination(1, TimeUnit.HOUR);
         out.close();
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2020-01-03
          • 2012-10-17
          • 2014-10-28
          • 1970-01-01
          • 2011-11-25
          • 2012-05-16
          • 1970-01-01
          • 2012-09-14
          相关资源
          最近更新 更多