【问题标题】:A Java theading program which reads lines of a huge CSV file一个读取巨大 CSV 文件行的 Java 主题程序
【发布时间】:2012-06-26 11:15:02
【问题描述】:

我有一个包含超过 700K + 行的巨大 CSV 文件。我必须解析该 CSV 文件的行并执行操作。我想通过使用线程来做到这一点。我一开始尝试做的很简单。每个线程都应处理 CSV 文件的唯一行。我的行数有限,只能读取 3000 行。我创建了三个线程。每个线程都应该读取 CSV 文件的一行。以下是代码:

import java.io.*;

class CSVOps implements Runnable
{
    static int lineCount = 1;
    static int limit = 3000;
    BufferedReader CSVBufferedReader;

    public CSVOps(){} // Default constructor

    public CSVOps(BufferedReader br){
        this.CSVBufferedReader = br;
    }

    private synchronized void readCSV(){
        System.out.println("Current thread "+Thread.currentThread().getName());
        String line;
        try {
            while((line = CSVBufferedReader.readLine()) != null){
                System.out.println(line);
                lineCount ++;
                if(lineCount >= limit){
                    break;
                }
            }
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void run() {
        readCSV();
    }

}

class CSVResourceHandler
{
    String CSVPath;

    public CSVResourceHandler(){ }// default constructor

    public CSVResourceHandler(String path){
        File f = new File(path);
        if(f.exists()){
            CSVPath = path;
        }
        else{
            System.out.println("Wrong file path! You gave: "+path);
        }
    }

    public BufferedReader getCSVFileHandler(){
        BufferedReader br = null;
        try{
            FileReader is = new FileReader(CSVPath);
            br = new BufferedReader(is);
        }
        catch(Exception e){
        }
        return br;
    }
}

public class invalidRefererCheck
{
    public static void main(String [] args) throws InterruptedException
    {
        String pathToCSV = "/home/shantanu/DEV_DOCS/Contextual_Work/invalid_domain_kw_site_wise_click_rev2.csv";
        CSVResourceHandler csvResHandler = new CSVResourceHandler(pathToCSV);
        CSVOps ops = new CSVOps(csvResHandler.getCSVFileHandler());

        Thread t1 = new Thread(ops);
        t1.setName("T1");

        Thread t2 = new Thread(ops);
        t1.setName("T2");

        Thread t3 = new Thread(ops);
        t1.setName("T3");

        t1.start();
        t2.start();
        t3.start();
    }
}

类 CSVResourceHandler 简单查找传递的文件是否存在,然后创建一个 BufferedReader 并提供它。这个阅读器被传递给CSVOps 类。它有一个方法 readCSV,它读取 CSV 文件的单行并打印出来。限制设置为 3000。

现在为了让线程不搞乱计数,我将这些限制和计数变量都声明为静态的。当我运行这个程序时,我得到了奇怪的输出。我只得到大约 1000 条记录,有时我得到 1500 条。它们是随机排列的。在输出结束时,我得到两行 CSV 文件,当前线程名称为 main!!

我是一个线程新手。我希望阅读这个 CSV 文件变得更快。可以做什么?

【问题讨论】:

  • 使用多个阅读线程无济于事。阻塞点不是 CPU 而是 IO。
  • 我建议您使用一个线程,并在读取每一行时将其传递到一个可由第二个线程使用的队列。这样可以确保保留订单。
  • @dystroy:但是在这种情况下,可以做些什么呢?
  • 谁能解释一下为什么只有任何一个线程的输出是可见的?我设置了 3000 行的限制,我每次都得到大约 1000 行
  • 使用多个线程来加速 IO 是很困难的,但是将一个文件分成几行字符串可能会非常昂贵,使用单独的线程来做到这一点可能很有用。

标签: java multithreading


【解决方案1】:

好的,首先,不要使用多个线程从单个机械磁盘执行并行 I/O。它实际上会降低性能,因为每次线程有机会运行时,机械头都需要寻找下一个读取位置。因此,您不必要地在磁盘的磁头周围弹跳,这是一项昂贵的操作。

使用单生产者多消费者模型使用单个线程读取行并使用工作池处理它们。

关于你的问题:

在退出 main 之前,您实际上不应该等待线程完成吗?

public class invalidRefererCheck
{
    public static void main(String [] args) throws InterruptedException
    {
        ...
        t1.start();
        t2.start();
        t3.start();

        t1.join();
        t2.join();
        t3.join();
    }
}

【讨论】:

  • 默认情况下,java 线程不是守护进程,所以他不必这样做。
  • Use a single producer multiple consumer model to read lines using a single thread and process them using a pool of workers.。谢谢你的提示,我刚上它。而 join() 实际上将一个线程的处理排在另一个队列中,这样在所有线程完成之前不会停止主线程,对吗?我的意思是理论上是这样的:O
  • @Shades88:实际上没有。在所有线程启动后,它们与主线程并行运行并且彼此并行运行。 join 只是保证主线程在其子线程完成之前不会取得进展,但子线程仍将彼此并行工作。
【解决方案2】:

我建议大块阅读文件。分配一个大缓冲区对象,读取一个块,从末尾解析以找到最后一个 EOL 字符,将缓冲区的最后一位复制到临时字符串中,在 EOL+1 处将 null 推入缓冲区,从缓冲区排队引用,立即创建一个新的,首先复制临时字符串,然后填充缓冲区的其余部分并重复直到 EOF。重复直到完成。使用线程池来解析/处理缓冲区。

您必须将整块有效行排队。排队单行将导致线程通信花费的时间比解析时间长。

请注意,这和类似的情况可能会导致池中的线程“无序”处理块。如果必须保留顺序(例如,输入文件已排序,而输出将进入另一个必须保持排序的文件),则可以让块组装器线程在每个块对象中插入一个序列号。然后,池线程可以将处理过的缓冲区传递给另一个线程(或任务),该线程(或任务)保留一个乱序块列表,直到所有先前的块都进入为止。

多线程不一定是困难/危险/无效的。如果您使用队列/池/任务,请避免同步/加入,不要不断地创建/终止/销毁线程,并且只在一次只有一个线程可以处理的大型缓冲区对象周围排队。您应该会看到一个很好的加速,几乎没有死锁、错误共享等的可能性。

这种加速的下一步是预先分配缓冲区池队列,以消除缓冲区和相关 GC 的持续创建/删除,并在开始时使用(L1 缓存大小)“死区”每个缓冲区完全消除缓存共享。 这在多核机器上会很快(尤其是使用 SSD!)。

哦,Java,对。我为我用空终止符回答的“CplusPlus-iness”道歉。不过,其余的点都还可以。这应该是一个与语言无关的答案:)

【讨论】:

  • 感谢您详细解答。 This should be a language-agnostic answer。是的,我会记住这一点,看看我是否可以在 java 中使用它:)
猜你喜欢
  • 2021-05-29
  • 2011-05-06
  • 2016-11-26
  • 1970-01-01
  • 2020-08-07
  • 2014-09-30
  • 2017-08-11
相关资源
最近更新 更多