【发布时间】:2019-06-09 23:20:51
【问题描述】:
所以我有一个大文本文件,在本例中它大约为 4.5 GB,我需要尽快处理整个文件。现在我使用 3 个线程(不包括主线程)进行了多线程处理。一个输入线程读取输入文件,一个处理线程处理数据,一个输出线程将处理后的数据输出到文件中。
目前,瓶颈是处理部分。因此,我想在组合中添加更多处理线程。但是,这会造成我有多个线程访问同一个 BlockingQueue 的情况,因此它们的结果不会保持输入文件的顺序。
我正在寻找的功能示例如下: 输入文件:1、2、3、4、5 输出文件:^ 同。不是 2、1、4、3、5 或任何其他组合。
我编写了一个虚拟程序,它在功能上与实际程序相同,但不包括处理部分,(由于处理类包含机密信息,我不能给你实际程序)。我还应该提到,所有类(输入、处理和输出)都是包含在 Main 类中的内部类,该 Main 类包含 initialise() 方法和下面列出的主线程代码中提到的类级变量。
主线程:
static volatile boolean readerFinished = false; // class level variables
static volatile boolean writerFinished = false;
private void initialise() throws IOException {
BlockingQueue<String> inputQueue = new LinkedBlockingQueue<>(1_000_000);
BlockingQueue<String> outputQueue = new LinkedBlockingQueue<>(1_000_000); // capacity 1 million.
String inputFileName = "test.txt";
String outputFileName = "outputTest.txt";
BufferedReader reader = new BufferedReader(new FileReader(inputFileName));
BufferedWriter writer = new BufferedWriter(new FileWriter(outputFileName));
Thread T1 = new Thread(new Input(reader, inputQueue));
Thread T2 = new Thread(new Processing(inputQueue, outputQueue));
Thread T3 = new Thread(new Output(writer, outputQueue));
T1.start();
T2.start();
T3.start();
while (!writerFinished) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
reader.close();
writer.close();
System.out.println("Exited.");
}
输入线程:(请原谅注释的调试代码,正在使用它来确保读取器线程实际正确执行)。
class Input implements Runnable {
BufferedReader reader;
BlockingQueue<String> inputQueue;
Input(BufferedReader reader, BlockingQueue<String> inputQueue) {
this.reader = reader;
this.inputQueue = inputQueue;
}
@Override
public void run() {
String poisonPill = "ChH92PU2KYkZUBR";
String line;
//int linesRead = 0;
try {
while ((line = reader.readLine()) != null) {
inputQueue.put(line);
//linesRead++;
/*
if (linesRead == 500_000) {
//batchesRead += 1;
//System.out.println("Batch read");
linesRead = 0;
}
*/
}
inputQueue.put(poisonPill);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
readerFinished = true;
}
}
处理线程:(通常这实际上会对线路做一些事情,但出于模型的目的,我刚刚将其立即推送到输出线程)。如有必要,我们可以通过让线程在每一行休眠一小段时间来模拟它做一些工作。
class Processing implements Runnable {
BlockingQueue<String> inputQueue;
BlockingQueue<String> outputQueue;
Processing(BlockingQueue<String> inputQueue, BlockingQueue<String> outputQueue) {
this.inputQueue = inputQueue;
this.outputQueue = outputQueue;
}
@Override
public void run() {
while (true) {
try {
if (inputQueue.isEmpty() && readerFinished) {
break;
}
String line = inputQueue.take();
outputQueue.put(line);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
输出线程:
class Output implements Runnable {
BufferedWriter writer;
BlockingQueue<String> outputQueue;
Output(BufferedWriter writer, BlockingQueue<String> outputQueue) {
this.writer = writer;
this.outputQueue = outputQueue;
}
@Override
public void run() {
String line;
ArrayList<String> outputList = new ArrayList<>();
while (true) {
try {
line = outputQueue.take();
if (line.equals("ChH92PU2KYkZUBR")) {
for (String outputLine : outputList) {
writer.write(outputLine);
}
System.out.println("Writer finished - executing termination");
writerFinished = true;
break;
}
line += "\n";
outputList.add(line);
if (outputList.size() == 500_000) {
for (String outputLine : outputList) {
writer.write(outputLine);
}
System.out.println("Writer wrote batch");
outputList = new ArrayList<>();
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
}
所以现在一般数据流是非常线性的,看起来像这样:
输入>处理>输出。
但我想要的是这样的:
但问题是,当数据输出时,要么需要按正确的顺序排序,要么需要已经按正确的顺序排列。
我们将不胜感激有关如何进行此操作的建议或示例。
过去我曾使用 Future 和 Callable 接口来解决涉及此类并行数据流的任务,但不幸的是,该代码不是从单个队列中读取的,因此在这里的帮助很小。
我还应该补充一点,对于那些会注意到这一点的人,batchSize 和toxicPill 通常在主线程中定义,然后通过变量传递,它们不是通常是硬编码的在输入线程的代码中,输出检查写入线程。在凌晨 1 点左右为实验编写模型时,我有点懒惰。
编辑:我还应该提到,这是最多使用 Java 8 所必需的。 Java 9 及更高版本的功能无法使用,因为这些版本未安装在运行此程序的环境中。
【问题讨论】:
-
1.您根据哪些数据得出处理是限制因素的结论? 2. 如果处理是限制因素,它是否使用了几乎所有可用的 CPU? 3. 你能不能让你的阅读器给每个线程传递一个序列号,让那个线程输出它自己的带有数字的文件,然后把文件放在一起?
-
我基于来自调试器的数据和 CPU 使用率。一个核心几乎总是固定的,而其他核心的使用率约为 40%。那个被钉住的核心正在处理。此外,调试 IO 线程会导致读取器线程在很长一段时间内被阻塞,因为它的读取速度比单个处理器的处理速度要快。写入线程受限于单个处理线程的输出速度,并没有那么快。调试显示 outputQueue 在峰值时从未超过 20 项。
-
此外,关于最后重新组装较小的文件,这在理论上可行,但我真的不想在最后重新组装 57+ 百万个小文件。为每个文件简单地打开一个新阅读器的开销是荒谬的。
-
如果您认为处理阶段是瓶颈,那么为什么不在处理数据批次 n 的同时单线程加载数据批次 n+1 呢?
-
输入线程已经这样做了。它将 2 个批次加载到 inputQueue 中,处理线程从 inputQueue 中取出内容,对其进行处理,然后将其放入 outputQueue 中。我想你可能误解了我所说的。文件读取不是瓶颈。实际的处理线程(对文件的每一行执行操作的线程)是瓶颈。上面的代码中没有显示这个瓶颈,因为正如我所说,对文件行进行的操作是机密的,不能在这里显示。
标签: java multithreading java-8