【问题标题】:Multithreading a massive file read多线程读取大量文件
【发布时间】:2012-05-16 18:58:07
【问题描述】:

我仍在思考 Java 中并发的工作原理。我知道(如果您订阅了 OO Java 5 并发模型)您使用 run()call() 方法(分别)实现了 TaskCallable,并且您应该尽可能多地并行化尽可能实现该方法。

但我仍然不了解 Java 并发编程的一些内在特性:

  • 如何为Taskrun() 方法分配要执行的适量并发工作?

作为一个具体的例子,如果我有一个 I/O-bound readMobyDick() 方法,该方法将 Herman Melville 的 Moby Dick 的全部内容从本地系统上的文件读取到内存中。假设我希望这个 readMobyDick() 方法是并发的并由 3 个线程处理,其中:

  • 线程 #1 将书的前 1/3 读入内存
  • 线程 #2 将书的第二个 1/3 读入内存
  • 线程 #3 将书的最后 1/3 读入内存

我是否需要将 Moby Dick 分成三个文件并将它们分别传递给自己的任务,或者我是否只需从实现的 run() 方法和(不知何故)Executor 内部调用 readMobyDick()知道如何在线程之间分解工作。

我是一个非常直观的学习者,因此非常感谢任何正确方法的代码示例!谢谢!

【问题讨论】:

  • 很棒的方法名!当我尝试阅读《白鲸记》时,我发现我也必须同时阅读(与其他书籍交错);-)
  • 那么...大白鲸应该是 .NET,是吗?

标签: java multithreading concurrency


【解决方案1】:

您可能无意中选择了并行活动中绝对最糟糕的示例!

从单个机械磁盘并行读取实际上比使用单个线程读取要慢,因为您实际上是在轮到每个线程运行时将机械头弹跳到磁盘的不同部分。最好将其保留为单线程活动。

让我们再举一个例子,它与你的相似,但实际上可以提供一些好处:假设我想在一个巨大的单词列表中搜索某个单词的出现(这个列表甚至可能来自磁盘文件,但就像我说的那样,由一个线程读取)。假设我可以像您的示例中那样使用 3 个线程,每个线程在巨大的单词列表的 1/3 上进行搜索,并保留一个本地计数器来记录搜索到的单词出现的次数。

在这种情况下,您希望将列表分成 3 个部分,将每个部分传递给类型实现 Runnable 的不同对象,并在 run 方法中实现搜索。

运行时本身不知道如何进行分区或类似的操作,您必须自己指定。还有很多其他的分区策略,各有优缺点,但我们现在可以坚持静态分区。

让我们看一些代码:

class SearchTask implements Runnable {
     private int localCounter = 0;
     private int start; // start index of search
     private int end;
     private List<String> words;
     private String token;

     public SearchTask(int start, int end, List<String> words, String token) {
         this.start = start;
         this.end = end;
         this.words = words;
         this.token = token;
     }

     public void run() {
         for(int i = start; i < end; i++) {
              if(words.get(i).equals(token)) localCounter++;
         }
     }

     public int getCounter() { return localCounter; }
}

// meanwhile in main :)

List<String> words = new ArrayList<String>();
// populate words 
// let's assume you have 30000 words

// create tasks
SearchTask task1 = new SearchTask(0, 10000, words, "John");
SearchTask task2 = new SearchTask(10000, 20000, words, "John");
SearchTask task3 = new SearchTask(20000, 30000, words, "John");

// create threads for each task
Thread t1 = new Thread(task1);
Thread t2 = new Thread(task2);
Thread t3 = new Thread(task3);

// start threads
t1.start();
t2.start();
t3.start();

// wait for threads to finish
t1.join();
t2.join();
t3.join();

// collect results
int counter = 0;
counter += task1.getCounter();
counter += task2.getCounter();
counter += task3.getCounter();

这应该很好用。请注意,在实际情况下,您将构建更通用的分区方案。如果您希望返回结果,您也可以使用ExecutorService 并实现Callable 而不是Runnable

所以一个使用更高级结构的替代示例:

class SearchTask implements Callable<Integer> {
     private int localCounter = 0;
     private int start; // start index of search
     private int end;
     private List<String> words;
     private String token;

     public SearchTask(int start, int end, List<String> words, String token) {
         this.start = start;
         this.end = end;
         this.words = words;
         this.token = token;
     }

     public Integer call() {
         for(int i = start; i < end; i++) {
              if(words.get(i).equals(token)) localCounter++;
         }
         return localCounter;
     }        
}

// meanwhile in main :)

List<String> words = new ArrayList<String>();
// populate words 
// let's assume you have 30000 words

// create tasks
List<Callable> tasks = new ArrayList<Callable>();
tasks.add(new SearchTask(0, 10000, words, "John"));
tasks.add(new SearchTask(10000, 20000, words, "John"));
tasks.add(new SearchTask(20000, 30000, words, "John"));

// create thread pool and start tasks
ExecutorService exec = Executors.newFixedThreadPool(3);
List<Future> results = exec.invokeAll(tasks);

// wait for tasks to finish and collect results
int counter = 0;
for(Future f: results) {
    counter += f.get();
}

【讨论】:

  • 那么什么是可以从多线程中受益的任务的好例子呢?我真的一点都不关心从磁盘读取文件——我关心的是看到一个活生生的、呼吸的(代码)示例,说明如何将工作分块并提供给任务。
  • @herpylderp:现在应该完成了。
  • 哇很好的答案@Tudor!我唯一的后续是:当您谈到分区方案(您提到“静态分区”)时,这些 Java 构造是否在并发包中可用,或者这些理论策略是否对任何语言/环境都通用?我问是因为它们似乎是我没有“得到”的核心。再次感谢(和 +1)!
  • @herpylderp:另一种方案是“按需”。这通常在您的任务多于可用线程并且任务需要不同的时间才能完成时使用。在这种情况下,最好使用线程池并让线程“按需”执行任务,而不是在开始时手动分配它们以确保负载平衡工作。
  • @herpylderp:线程池(就像我上面使用的那样)在内部存储了一个任务队列和一堆线程。线程不断循环查看队列中是否有可用的任务,如果有,它们会执行一个并执行它,然后再返回执行更多任务。用户只需使用submit 将任务放入队列中,线程就会按照我解释的方式执行它们。
【解决方案2】:

您选择了一个不好的例子,因为 Tudor 非常好心地指出了这一点。旋转磁盘硬件受到移动盘片和磁头的物理约束,最有效的读取实现是按顺序读取每个块,这样可以减少移动磁头或等待磁盘对齐的需要。

也就是说,某些操作系统并不总是将数据连续存储在磁盘上,如果您的操作系统/文件系统没有为您完成这项工作,碎片整理可以提高磁盘性能。

正如您提到的想要一个有益的程序,让我建议一个简单的程序,矩阵加法。

假设您为每个内核创建了一个线程,您可以轻松地将要添加的任意两个矩阵分成 N(每个线程一个)行。矩阵加法(如果你还记得的话)是这样工作的:

A + B = C

[ a11, a12, a13 ]   [ b11, b12, b13]  =  [ (a11+b11), (a12+b12), (a13+c13) ]
[ a21, a22, a23 ] + [ b21, b22, b23]  =  [ (a21+b21), (a22+b22), (a23+c23) ]
[ a31, a32, a33 ]   [ b31, b32, b33]  =  [ (a31+b31), (a32+b32), (a33+c33) ]

因此,要将其分布在 N 个线程中,我们只需将行数和模数除以线程数即可得到将添加的“线程 ID”。

matrix with 20 rows across 3 threads
row % 3 == 0 (for rows 0, 3, 6,  9, 12, 15, and 18)
row % 3 == 1 (for rows 1, 4, 7, 10, 13, 16, and 19)
row % 3 == 2 (for rows 2, 5, 8, 11, 14, and 17)
// row 20 doesn't exist, because we number rows from 0

现在每个线程“知道”它应该处理哪些行,并且“每行”的结果可以很简单地计算出来,因为结果不会跨入其他线程的计算域

现在所需要的只是一个“结果”数据结构,它跟踪计算值的时间,以及设置最后一个值的时间,然后计算完成。在这个带有两个线程的矩阵加法结果的“假”示例中,使用两个线程计算答案大约需要一半的时间。

// the following assumes that threads don't get rescheduled to different cores for 
// illustrative purposes only.  Real Threads are scheduled across cores due to
// availability and attempts to prevent unnecessary core migration of a running thread.
[ done, done, done ] // filled in at about the same time as row 2 (runs on core 3)
[ done, done, done ] // filled in at about the same time as row 1 (runs on core 1)
[ done, done, .... ] // filled in at about the same time as row 4 (runs on core 3)
[ done, ...., .... ] // filled in at about the same time as row 3 (runs on core 1)

更复杂的问题可以通过多线程来解决,不同的问题用不同的技术解决。我特意挑选了一个最简单的例子。

【讨论】:

    【解决方案3】:

    您使用 run() 或 call() 方法实现 Task 或 Callable (分别),并且您应该尽可能多地并行化 尽可能实现的方法。

    Task 代表一个离散的工作单元
    将文件加载到内存中是一个离散的工作单元,因此可以将此活动委托给后台线程。 IE。后台线程运行此加载文件的任务。
    它是一个离散的工作单元,因为它不需要其他依赖项来完成其工作(加载文件)并且具有离散的边界。
    您要问的是进一步将其划分为任务。 IE。一个线程加载文件的 1/3,而另一个线程加载 2/3 等等。
    如果您能够将任务划分为更多的子任务,那么根据定义,它一开始就不是任务。因此,加载文件本身就是一项任务。

    举个例子:
    假设您有一个 GUI,您需要向用户展示来自 5 个不同文件的数据。为了呈现它们,您还需要准备一些数据结构来处理实际数据。
    所有这些都是单独的任务。
    例如。文件的加载是 5 个不同的任务,因此可以由 5 个不同的线程完成。
    数据结构的准备可以在不同的线程中完成。
    GUI 当然在另一个线程中运行。
    所有这些都可以同时发生

    【讨论】:

      【解决方案4】:

      如果您的系统支持高吞吐量 I/O,您可以这样做:

      How to read a file using multiple threads in Java when a high throughput(3GB/s) file system is available

      这里是多线程读取单个文件的解决方案。

      将文件分成N个块,在一个线程中读取每个块,然后按顺序合并。注意跨越块边界的线。这是用户建议的基本思想 slaks

      对单个 20 GB 文件的多线程实现进行基准测试:

      1 线程:50 秒:400 MB/s

      2 个线程:30 秒:666 MB/s

      4 线程:20 秒:1GB/s

      8 线程:60 秒:333 MB/s

      等效的 Java7 readAllLines():400 秒:50 MB/s

      注意:这可能仅适用于旨在支持高吞吐量 I/O 的系统,而不适用于普通的个人计算机

      这是代码的基本细节,完整的详细信息,请点击链接

      public class FileRead implements Runnable
      {
      
      private FileChannel _channel;
      private long _startLocation;
      private int _size;
      int _sequence_number;
      
      public FileRead(long loc, int size, FileChannel chnl, int sequence)
      {
          _startLocation = loc;
          _size = size;
          _channel = chnl;
          _sequence_number = sequence;
      }
      
      @Override
      public void run()
      {
              System.out.println("Reading the channel: " + _startLocation + ":" + _size);
      
              //allocate memory
              ByteBuffer buff = ByteBuffer.allocate(_size);
      
              //Read file chunk to RAM
              _channel.read(buff, _startLocation);
      
              //chunk to String
              String string_chunk = new String(buff.array(), Charset.forName("UTF-8"));
      
              System.out.println("Done Reading the channel: " + _startLocation + ":" + _size);
      
      }
      
      //args[0] is path to read file
      //args[1] is the size of thread pool; Need to try different values to fing sweet spot
      public static void main(String[] args) throws Exception
      {
          FileInputStream fileInputStream = new FileInputStream(args[0]);
          FileChannel channel = fileInputStream.getChannel();
          long remaining_size = channel.size(); //get the total number of bytes in the file
          long chunk_size = remaining_size / Integer.parseInt(args[1]); //file_size/threads
      
      
          //thread pool
          ExecutorService executor = Executors.newFixedThreadPool(Integer.parseInt(args[1]));
      
          long start_loc = 0;//file pointer
          int i = 0; //loop counter
          while (remaining_size >= chunk_size)
          {
              //launches a new thread
              executor.execute(new FileRead(start_loc, toIntExact(chunk_size), channel, i));
              remaining_size = remaining_size - chunk_size;
              start_loc = start_loc + chunk_size;
              i++;
          }
      
          //load the last remaining piece
          executor.execute(new FileRead(start_loc, toIntExact(remaining_size), channel, i));
      
          //Tear Down
      
      }
      
      }
      

      【讨论】:

        猜你喜欢
        • 2017-11-27
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-06-02
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多