【问题标题】:How to use multiple CSV files in mapreduce如何在 mapreduce 中使用多个 CSV 文件
【发布时间】:2024-05-20 18:05:02
【问题描述】:

首先,我将解释我要做什么。首先,我将输入文件(第一个 CSV 文件)放入 mapreduce 作业中,其他 CSV 文件将放入映射器类中。但事情就是这样。 mapper 类中的代码不能正常工作,就像右下角的代码一样。我想组合两个 CSV 文件以在每个 CSV 文件中使用多个列。 例如,1个文件有BibNum(用户账号)、checkoutdatetime(图书checkoutdatetime)和itemtype(图书itemtype),2个CSV文件有BibNum(用户账号)、Title(图书标题)、Itemtype等。我想知道下个月可能会借到哪本书。如果您知道可以合并两个 CSV 文件并为我提供任何帮助的方法,我将不胜感激。如果您对我的代码有任何疑问,请告诉我,我会尽力澄清。

 Path p = new Path("hdfs://0.0.0.0:8020/user/training/Inventory_Sample");
       FileSystem fs = FileSystem.get(conf);

       BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(p)));


        try {


            String BibNum = "Test";
            //System.out.print("test");
            while(br.readLine() != null){
                //System.out.print("test");
                if(!br.readLine().startsWith("BibNumber")) {
                    String subject[] = br.readLine().split(",");
                    BibNum = subject[0];
                }
            }

.

import java.io.BufferedReader;

import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Date;
import java.util.HashMap;
import java.text.ParseException;
import java.text.SimpleDateFormat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;



public class StubMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private Text outkey = new Text();
    //private MinMaxCountTuple outTuple = new MinMaxCountTuple();

    //String csvFile = "hdfs://user/training/Inventory_Sample";
    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {


       Configuration conf = context.getConfiguration();
       //conf.addResource("/etc/hadoop/conf/core-site.xml");
       //conf.addResource("/etc/hadoop/conf/hdfs-site.xml");
       Path p = new Path("hdfs://0.0.0.0:8020/user/training/Inventory_Sample");
       FileSystem fs = FileSystem.get(conf);

       BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(p)));


        try {


            String BibNum = "Test";
            //System.out.print("test");
            while(br.readLine() != null){
                //System.out.print("test");
                if(!br.readLine().startsWith("BibNumber")) {
                    String subject[] = br.readLine().split(",");
                    BibNum = subject[0];
                }
            }

            if(value.toString().startsWith("BibNumber"))
            {
                return;
            }

            String data[] = value.toString().split(",");

            String BookType = data[2];
            String DateTime = data[5];

            SimpleDateFormat frmt = new SimpleDateFormat("MM/dd/yyyy hh:mm:ss a");

            Date creationDate = frmt.parse(DateTime);
            frmt.applyPattern("dd-MM-yyyy");
            String dateTime = frmt.format(creationDate);

            //outkey.set(BookType + " " + dateTime);
            outkey.set(BibNum + " " + BookType + " " + dateTime);

            //outUserId.set(userId);
            context.write(outkey, new IntWritable(1));

        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        finally{
            br.close();
        }


    }
}

【问题讨论】:

    标签: java csv file-io mapreduce


    【解决方案1】:

    您正在读取映射器代码中的 CSV 文件。

    如果您使用路径在映射器中打开文件,我猜您使用的是分布式缓存,那么只有文件会随 jar 一起发送到应该运行 map reduce 的每个节点。

    有一种方法可以组合,但不是在映射器中。

    您可以尝试以下方法:-

    1) 为两个不同的文件编写 2 个单独的映射器。

    2) 仅将映射器所需的字段发送到减速器。

    3) 在 reducer 中组合结果(因为你想加入某个特定的键)。 您可以查看多输入格式示例了解更多信息。

    【讨论】:

    • 如果您发布一些您刚才描述的示例的链接,那就太好了。
    最近更新 更多