【问题标题】:Process large text file concurrently同时处理大文本文件
【发布时间】:2019-06-09 23:20:51
【问题描述】:

所以我有一个大文本文件,在本例中它大约为 4.5 GB,我需要尽快处理整个文件。现在我使用 3 个线程(不包括主线程)进行了多线程处理。一个输入线程读取输入文件,一个处理线程处理数据,一个输出线程将处理后的数据输出到文件中。

目前,瓶颈是处理部分。因此,我想在组合中添加更多处理线程。但是,这会造成我有多个线程访问同一个 BlockingQueue 的情况,因此它们的结果不会保持输入文件的顺序。

我正在寻找的功能示例如下: 输入文件:1、2、3、4、5 输出文件:^ 同。不是 2、1、4、3、5 或任何其他组合。

我编写了一个虚拟程序,它在功能上与实际程序相同,但不包括处理部分,(由于处理类包含机密信息,我不能给你实际程序)。我还应该提到,所有类(输入、处理和输出)都是包含在 Main 类中的内部类,该 Main 类包含 initialise() 方法和下面列出的主线程代码中提到的类级变量。

主线程:

static volatile boolean readerFinished = false; // class level variables
static volatile boolean writerFinished = false;

private void initialise() throws IOException {
    BlockingQueue<String> inputQueue = new LinkedBlockingQueue<>(1_000_000);
    BlockingQueue<String> outputQueue = new LinkedBlockingQueue<>(1_000_000); // capacity 1 million. 

    String inputFileName = "test.txt";
    String outputFileName = "outputTest.txt";

    BufferedReader reader = new BufferedReader(new FileReader(inputFileName));
    BufferedWriter writer = new BufferedWriter(new FileWriter(outputFileName));


    Thread T1 = new Thread(new Input(reader, inputQueue));
    Thread T2 = new Thread(new Processing(inputQueue, outputQueue));
    Thread T3 = new Thread(new Output(writer, outputQueue));

    T1.start();
    T2.start();
    T3.start();

    while (!writerFinished) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    reader.close();
    writer.close();

    System.out.println("Exited.");
}

输入线程:(请原谅注释的调试代码,正在使用它来确保读取器线程实际正确执行)。

class Input implements Runnable {
    BufferedReader reader;
    BlockingQueue<String> inputQueue;

    Input(BufferedReader reader, BlockingQueue<String> inputQueue) {
        this.reader = reader;
        this.inputQueue = inputQueue;
    }

    @Override
    public void run() {
        String poisonPill = "ChH92PU2KYkZUBR";
        String line;
        //int linesRead = 0;

        try {
            while ((line = reader.readLine()) != null) {
                inputQueue.put(line);
                //linesRead++;

                /*
                if (linesRead == 500_000) {
                    //batchesRead += 1;
                    //System.out.println("Batch read");
                    linesRead = 0;
                }
                */
            }

            inputQueue.put(poisonPill);
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }

        readerFinished = true;

    }
}

处理线程:(通常这实际上会对线路做一些事情,但出于模型的目的,我刚刚将其立即推送到输出线程)。如有必要,我们可以通过让线程在每一行休眠一小段时间来模拟它做一些工作。

class Processing implements Runnable {
    BlockingQueue<String> inputQueue;
    BlockingQueue<String> outputQueue;

    Processing(BlockingQueue<String> inputQueue, BlockingQueue<String> outputQueue) {
        this.inputQueue = inputQueue;
        this.outputQueue = outputQueue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                if (inputQueue.isEmpty() && readerFinished) {
                    break;
                }

                String line = inputQueue.take();
                outputQueue.put(line);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

输出线程:

class Output implements Runnable {
    BufferedWriter writer;
    BlockingQueue<String> outputQueue;

    Output(BufferedWriter writer, BlockingQueue<String> outputQueue) {
        this.writer = writer;
        this.outputQueue = outputQueue;
    }

    @Override
    public void run() {
        String line;
        ArrayList<String> outputList = new ArrayList<>();

        while (true) {
            try {
                line = outputQueue.take();

                if (line.equals("ChH92PU2KYkZUBR")) {
                    for (String outputLine : outputList) {
                        writer.write(outputLine);
                    }
                    System.out.println("Writer finished - executing termination");

                    writerFinished = true;
                    break;
                }

                line += "\n";
                outputList.add(line);

                if (outputList.size() == 500_000) {
                    for (String outputLine : outputList) {
                        writer.write(outputLine);
                    }
                    System.out.println("Writer wrote batch");
                    outputList = new ArrayList<>();
                }
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

所以现在一般数据流是非常线性的,看起来像这样:

输入>处理>输出。

但我想要的是这样的:

但问题是,当数据输出时,要么需要按正确的顺序排序,要么需要已经按正确的顺序排列。

我们将不胜感激有关如何进行此操作的建议或示例。

过去我曾使用 Future 和 Callable 接口来解决涉及此类并行数据流的任务,但不幸的是,该代码不是从单个队列中读取的,因此在这里的帮助很小。

我还应该补充一点,对于那些会注意到这一点的人,batchSize 和toxicPill 通常在主线程中定义,然后通过变量传递,它们不是通常是硬编码的在输入线程的代码中,输出检查写入线程。在凌晨 1 点左右为实验编写模型时,我有点懒惰。

编辑:我还应该提到,这是最多使用 Java 8 所必需的。 Java 9 及更高版本的功能无法使用,因为这些版本未安装在运行此程序的环境中。

【问题讨论】:

  • 1.您根据哪些数据得出处理是限制因素的结论? 2. 如果处理是限制因素,它是否使用了几乎所有可用的 CPU? 3. 你能不能让你的阅读器给每个线程传递一个序列号,让那个线程输出它自己的带有数字的文件,然后把文件放在一起?
  • 我基于来自调试器的数据和 CPU 使用率。一个核心几乎总是固定的,而其他核心的使用率约为 40%。那个被钉住的核心正在处理。此外,调试 IO 线程会导致读取器线程在很长一段时间内被阻塞,因为它的读取速度比单个处理器的处理速度要快。写入线程受限于单个处理线程的输出速度,并没有那么快。调试显示 outputQueue 在峰值时从未超过 20 项。
  • 此外,关于最后重新组装较小的文件,这在理论上可行,但我真的不想在最后重新组装 57+ 百万个小文件。为每个文件简单地打开一个新阅读器的开销是荒谬的。
  • 如果您认为处理阶段是瓶颈,那么为什么不在处理数据批次 n 的同时单线程加载数据批次 n+1 呢?
  • 输入线程已经这样做了。它将 2 个批次加载到 inputQueue 中,处理线程从 inputQueue 中取出内容,对其进行处理,然后将其放入 outputQueue 中。我想你可能误解了我所说的。文件读取不是瓶颈。实际的处理线程(对文件的每一行执行操作的线程)是瓶颈。上面的代码中没有显示这个瓶颈,因为正如我所说,对文件行进行的操作是机密的,不能在这里显示。

标签: java multithreading java-8


【解决方案1】:

你能做什么:

  • 使用 X 个线程进行处理,其中 X 是可用于处理的内核数
  • 为每个线程提供自己的输入队列。
  • 读取器线程以可预测的方式将记录提供给每个线程的输入队列循环。
  • 由于输出文件对于内存来说太大了,你写了 X 个输出文件,每个线程一个,每个文件名中都有线程的索引,这样你就可以从文件名中重构原始顺序。
  • 该过程完成后,合并 X 输出文件。线程 1 的文件中的一行,线程 2 的文件中的一行,以此类推,再次以循环方式。这重新构成了原始订单。

额外的好处是,由于每个线程都有一个输入队列,因此读者之间的队列不会发生锁争用。 (仅在读取器和写入器之间)您甚至可以通过将大于 1 的批次放入输入队列中来优化这一点。

【讨论】:

  • 我会考虑做一个模型。如果它被证明是有效的,我会将其标记为已接受的答案。
【解决方案2】:

正如 Alexei 也提出的,您可以创建 OrderedTask:

class OrderedTask implements Comparable<OrderedTask> {

    private final Integer index;
    private final String line;

    public OrderedTask(Integer index, String line) {
        this.index = index;
        this.line = line;
    }


    @Override
    public int compareTo(OrderedTask o) {
        return index < o.getIndex() ? -1 : index == o.getIndex() ? 0 : 1;
    }

    public Integer getIndex() {
        return index;
    }

    public String getLine() {
        return line;
    }    
}

作为输出队列,您可以使用自己的优先级队列:

class OrderedTaskQueue {

    private final ReentrantLock lock;
    private final Condition waitForOrderedItem;
    private final int maxQueuesize;
    private final PriorityQueue<OrderedTask> backedQueue;

    private int expectedIndex;

    public OrderedTaskQueue(int maxQueueSize, int startIndex) {
        this.maxQueuesize = maxQueueSize;
        this.expectedIndex = startIndex;
        this.backedQueue = new PriorityQueue<>(2 * this.maxQueuesize);

        this.lock = new ReentrantLock();
        this.waitForOrderedItem = this.lock.newCondition();
    }


    public boolean put(OrderedTask item) {
        ReentrantLock lock = this.lock;
        lock.lock();
        try {
            while (this.backedQueue.size() >= maxQueuesize && item.getIndex() != expectedIndex) {
                this.waitForOrderedItem.await();
            }

            boolean result = this.backedQueue.add(item);
            this.waitForOrderedItem.signalAll();
            return result;
        } catch (InterruptedException e) {
            throw new RuntimeException();
        } finally {
            lock.unlock();
        }
    }


    public OrderedTask take() {
        ReentrantLock lock = this.lock;
        lock.lock();
        try {
            while (this.backedQueue.peek() == null || this.backedQueue.peek().getIndex() != expectedIndex) {
                this.waitForOrderedItem.await();
            }
            OrderedTask result = this.backedQueue.poll();
            expectedIndex++;
            this.waitForOrderedItem.signalAll();
            return result;
        } catch (InterruptedException e) {
            throw new RuntimeException();
        } finally {
            lock.unlock();
        }
    }
}

StartIndex 是第一个有序任务的索引,并且 ma​​xQueueSize 用于在我们等待某个较早的任务完成时停止处理其他任务(而不是填充内存)。它应该是处理线程数的两倍/三倍,不会立即停止处理并允许可扩展性。

那么你应该创建你的任务:

int indexOrder =0;
            while ((line = reader.readLine()) != null) {
                inputQueue.put(new OrderedTask(indexOrder++,line);                    

            }

仅因您的示例而使用逐行。您应该更改 OrderedTask 以支持该批次的行。

【讨论】:

  • 我对这种设计的唯一担心是它可能不如 Erwin 的性能好。当我有时间时,我会测试这两种设计并运行各种性能测试,看看哪个更快。无论哪种设计最终更快,都可能是选择的答案。不过,我感谢为此付出的努力,感谢您向我展示了如何实现这一目标。
  • 我期待您的表现结果。如果处理批次的大小相同并且所有批次的计算都相似,我会说这个解决方案应该更有效,因为它不会增加写入/合并多个文件的复杂性。锁争用应该不是问题,因为您的任务是计算敏感的。
  • 模型可能需要一些时间,因为我不再每天都在工作,但是我已经在家里花费了相当多的时间。我们会看看事情进展如何,但我会在完成后回复大家。
  • 在对此进行测试期间,我遇到了线程在队列为空时无法正确终止的问题,尽管我的毒丸逻辑。我正在调查。
  • 问题已解决。与您的队列无关,毒丸中有一个小错误,破坏了平等检查。使用此方法可在 3 分 30 秒内在第 7 代 i7(无超线程)上处理 5 GB 文本文件。不用说,我印象非常深刻。非常感谢您在这件事上提供的帮助。
【解决方案3】:

为什么不倒流?

  1. X 批次的输出调用;
  2. 生成X个promise/task(promise模式)谁会随机调用其中一个处理核心(保留一个批号,传递给输入核心);将调用处理程序批处理到有序列表中;
  3. 每个处理核心调用输入核心中的一个批次;
  4. 享受吗?

【讨论】:

  • 因为要知道批次的数量,我必须将整个文件读入内存。由于内存限制,这不是一个选项,这也是我在输入线程中逐行处理文件的原因,而不是直接将整个文件转储到 ram 中。话虽如此,如果我在队列提供的有限空间的情况下这样做,它可能会起作用。 inputQueue 通常在任何给定时间最多包含 2 个批次。但我可以尝试一次做一批。
  • 您不需要知道批次数量/整个文件即可应用该流程。你已经有了死亡药丸的概念;扭转它:)
猜你喜欢
  • 2011-05-16
  • 1970-01-01
  • 1970-01-01
  • 2016-09-02
  • 1970-01-01
  • 2022-06-15
  • 2013-08-13
  • 2013-10-25
  • 1970-01-01
相关资源
最近更新 更多