【问题标题】:Is a "Chain of Threads" a bad solution for this Java application?对于这个 Java 应用程序来说,“线程链”是一个糟糕的解决方案吗?
【发布时间】:2015-07-07 09:28:46
【问题描述】:

我正在运行一个程序,我在该程序中下载大文件,解析它们,然后将从文件中提取的数据写入另一个文件。

文件下载和解析需要很长时间,但写入任务平均只需一分钟左右。我的解决方案是拥有三个由三个线程组成的固定线程池。

ExecutorService downloadExecutor = Executors.newFixedThreadPool(3);
ExecutorService parseExecutor = Executors.newFixedThreadPool(3);
ExecutorService writeExecutor = Executors.newFixedThreadPool(3);

下载池中的一个线程下载文件,然后以文件名作为参数向解析器线程池提交一个新线程。这是在线程本身内完成的。然后下载线程开始从文件 URL 列表中下载另一个文件。

一旦解析器线程完成对文件中我想要的数据的解析,它就会将包含数据的新线程提交到写入线程池,然后将其写入 .csv 文件。

我的问题是是否有更优雅的解决方案。我还没有真正做过很多复杂的线程。由于我有很多文件要下载和解析,我不希望任何线程在任何时候都处于空闲状态。再次的想法是,由于解析文件可能需要一段时间,我不妨先创建单独的线程专门用于下载这些文件。

【问题讨论】:

    标签: java multithreading file-io


    【解决方案1】:

    为什么不只使用一个线程池。无论如何,下载、解析和保存都必须相互等待,因此最好的任务分离是每个文件使用一个线程。

    【讨论】:

    • 好吧,正如我所提到的,这些文件的大小可以是几 GB。解析可能需要 30 多分钟到一个小时才能完成。我的逻辑是,与其在下载新文件之前等待文件被解析,不如使用单独的线程继续下载文件,当文件下降时,将该文件提交到线程池。这将导致等待解析的文件队列。对我来说这听起来更有效率。
    • 先做最简单的事情@GreenGodot。无论如何,您的整体操作受到最慢部分的限制...您真的希望磁盘上等待数百 GB 的文件吗?
    • 如果有帮助,我会在完成文件解析后使用 File.delete() 删除下载的文件。它不能完全解决空间问题,但有帮助吗?
    • 实际上,在解析器的 ExecutorService 上实现一个 BlockingQueue 可以解决空间问题吗?由于线程的提交是通过下载线程完成的,如果队列已满,下载线程是否会暂停,直到它可以提交解析器线程?由于解析器线程在完成后会删除文件,这将是一种控制磁盘空间的方法。
    • @GreenGodot 如果您没有 CPU 来运行线程或没有数据输入线程,那么创建线程是没有意义的。如果下载速度快但解析速度慢,您确实可以将下载的文件排队,但您必须确保您控制了队列大小。
    【解决方案2】:

    这不是一个坏习惯,因为许多开发人员都会进行类似的编码。但是有一些事情你需要记住。

    第一,你不能指望性能会因为你有更多的线程而增加。有基于 CPU 数量的最佳线程数。

    第二,您必须确定如何处理异常。

    第三,您必须确保在需要停止应用程序的事件中可以关闭所有线程池。

    【讨论】:

      【解决方案3】:

      所以你的问题有两个方面:

      1. 计算绑定
      2. IO 绑定

      文件的读写是IO绑定的。异步 IO 最适合 IO 绑定任务。 Java 有AsynchronousFileChannel,它允许您读取和写入文件,而不必担心线程池通过完成处理程序实现连续性。 Complete Example.

      AsynchronousFileChannel ch = AsynchronousFileChannel.open(path);
          final ByteBuffer buf = ByteBuffer.allocate(1024);
          ch.read(buf, 0, 0,
                  new CompletionHandler() {
                      public void completed(Integer result, Integer length){        
                          ..
                      }
                      
                      public void failed(Throwable exc, Integer length) {
                          ..
                      }
                  }
          );
      

      你对写入做同样的事情,你只是写到频道

      ch.write(...
      

      不解析文件,这是一个计算绑定任务,你应该让你的 CPU 内核变热,你可以分配一个线程池等于你拥有的内核数。

       executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())
      

      现在要记住的是:您需要测试您的代码,而测试并发代码很困难。如果你不能证明它的正确性,就不要这样做。

      【讨论】:

        猜你喜欢
        • 2011-07-13
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2015-03-11
        • 2020-09-09
        • 1970-01-01
        • 2011-01-13
        相关资源
        最近更新 更多