【问题标题】:Hadoop 2: Empty result when using custom InputFormatHadoop 2:使用自定义 InputFormat 时结果为空
【发布时间】:2016-10-29 05:42:40
【问题描述】:

我想使用自己的 FileInputFormat 和自定义 RecordReader 将 csv 数据读入 <Long><String> 对。

因此我创建了MyTextInputFormat类:

import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;

public class MyTextInputFormat extends FileInputFormat<Long, String> {

  @Override
  public RecordReader<Long, String> getRecordReader(InputSplit input, JobConf job, Reporter reporter) throws IOException {
      reporter.setStatus(input.toString());
      return new MyStringRecordReader(job, (FileSplit)input);
  }

  @Override
  protected boolean isSplitable(FileSystem fs, Path filename) {
    return super.isSplitable(fs, filename);
  }
}

还有班级MyStringRecordReader

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;

public class MyStringRecordReader implements RecordReader<Long, String> {

    private LineRecordReader lineReader;
    private LongWritable lineKey;
    private Text lineValue;

    public MyStringRecordReader(JobConf job, FileSplit split) throws IOException {
        lineReader = new LineRecordReader(job, split);

        lineKey = lineReader.createKey();
        lineValue = lineReader.createValue();

        System.out.println("constructor called");
    }

    @Override
    public void close() throws IOException {
        lineReader.close();
    }

    @Override
    public Long createKey() {
        return lineKey.get();
    }

    @Override
    public String createValue() {
        System.out.println("createValue called");
        return lineValue.toString();
    }

    @Override
    public long getPos() throws IOException {
        return lineReader.getPos();
    }

    @Override
    public float getProgress() throws IOException {
        return lineReader.getProgress();
    }

    @Override
    public boolean next(Long key, String value) throws IOException {
        System.out.println("next called");

        // get the next line
        if (!lineReader.next(lineKey, lineValue)) {
            return false;
        }

        key = lineKey.get();
        value = lineValue.toString();

        System.out.println(key);
        System.out.println(value);


        return true;
    }
}

在我的 Spark 应用程序中,我通过调用 sparkContext.hadoopFile 方法来读取文件。但我只从以下代码中得到一个空输出

public class AssociationRulesAnalysis {

    @SuppressWarnings("serial")
    public static void main(String[] args) {
        JavaRDD<String> inputRdd = sc.hadoopFile(inputFilePath, MyTextInputFormat.class, Long.class, String.class).map(new Function<Tuple2<Long,String>, String>() {
            @Override
            public String call(Tuple2<Long, String> arg0) throws Exception {
                System.out.println("map: " + arg0._2());
                return arg0._2();
            }
        });

        List<String> asList = inputRdd.take(10);
        for(String s : asList) {
            System.out.println(s);
        }
    }
}

我只从 RDD 返回 10 个空行。

添加了prints 的控制台输出如下所示:

=== APP STARTED : local-1467182320798
constructor called
createValue called
next called
0
ä1
map:
next called
8
ö2
map:
next called
13
ü3
map:
next called
18
ß4
map:
next called
23
ä5
map:
next called
28
ö6
map:
next called
33
ü7
map:
next called
38
ß8
map:
next called
43
ä9
map:
next called
48
ü10
map:
next called
54
ä11
map:
next called
60
ß12
map:
next called
12
=====================
constructor called
createValue called
next called
0
ä1
map:
next called
8
ö2
map:
next called
13
ü3
map:
next called
18
ß4
map:
next called
23
ä5
map:
next called
28
ö6
map:
next called
33
ü7
map:
next called
38
ß8
map:
next called
43
ä9
map:
next called
48
ü10
map:










Stopping...

(RDD 数据打印在===== 输出下方(10 个空行!!!)。===== 上方的输出似乎是由RDD.count 调用完成的。在next 方法中正确显示键和值!?我做错了什么?

【问题讨论】:

    标签: hadoop apache-spark hadoop2 recordreader


    【解决方案1】:

    lineKeylineValue 永远不会初始化为 keyvalue 传递给 MyStringRecordReader 中覆盖的 next 方法。因此,当您尝试使用 RecordReader 时,它总是显示 EMPTY 字符串。 如果您希望文件中的记录使用不同的键和值,则需要使用传递给 next 方法的键和值,并使用计算的键和值初始化它们。如果您不打算更改键/值记录,请删除以下内容。每次执行这段代码时,您都会用 EMPTY 字符串和 0L 覆盖从文件中读取的键/值。

    key = lineKey.get();
    value = lineValue.toString();
    

    【讨论】:

    • 我将“createKey”和“createValue”方法的内容更改为“key = lineKey.get()”和“value = lineValue.toString()”上面的行没有成功,不幸的是仍然得到 10 个空行。
    • 在上述问题的代码 sn-p 中也更改了这一点
    • createKey 和 createValue 用于创建适当的键和值对象。这些 API 不会将文件中的数据值分配给键和值。这是“下一个”实施的责任。您的“下一个”实现是覆盖键和值。尝试在下一个方法中打印出键和值,看看我的意思。
    • 我在下一个方法中添加了一些打印,我看到“键”和“值”已正确填充,但我不知道如何调整“createValue”和“createKey” “ 方法。还是我必须调整我的“下一个”方法?但是我不明白...您能举一个编码示例吗?
    • 很高兴我能回答您的问题“我的控制台中只显示了 10 个空行!有人可以帮忙吗?”。现在回到您关于编码示例的另一个问题,我不明白您通过编写自己的 RecordReader 来尝试做什么。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2011-12-03
    • 2014-03-24
    • 1970-01-01
    • 1970-01-01
    • 2022-11-14
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多