【发布时间】:2014-05-09 03:40:10
【问题描述】:
请放轻松,因为我是 hadoop 和 MapReduce 的新手。
我有一个 .tar.gz 文件,我试图通过使用 CompressionCodecfactory 编写自定义 InputFormatter 来使用 mapReduce 读取该文件。
我在 Internet 上阅读了一些文档,其中 CompressionCodecFactory 可用于读取 .tar.gz 文件。因此我在我的代码中实现了它。
运行代码后得到的输出绝对是垃圾。
下面提供了我的一段输入文件:
"MAY 2013 KOTZEBUE, AK"
"RALPH WIEN MEMORIAL AIRPORT (PAOT)"
"Lat:66° 52'N Long: 162° 37'W Elev (Ground) 30 Feet"
"Time Zone : ALASKA WBAN: 26616 ISSN#: 0197-9833"
01,21,0,11,-11,3,11,54,0," ",4, ,0.0,0.00,30.06,30.09,10.2,36,10.0,25,360,22,360,01
02,25,3,14,-9,5,12,51,0," ",4, ,0.0,0.00,30.09,30.11,6.1,34,7.7,16,010,14,360,02
03,21,1,11,-12,7,11,54,0," ",4, ,0.0,0.00,30.14,30.15,5.0,28,6.0,17,270,16,270,03
04,20,8,14,-10,11,13,51,0,"SN BR",4, ,.001,.0001,30.09,30.11,8.6,26,9.2,20,280,15,280,04
05,29,19,24,-1,21,23,41,0,"SN BR",5, ,0.6,0.06,30.11,30.14,8.1,20,8.5,22,240,20,240,05
06,27,19,23,-3,21,23,42,0,"SN BR",4, ,0.1,0.01,30.14,30.15,8.7,19,9.4,18,200,15,200,06
我得到的输出很奇怪:
��@(���]�OX}�s���{Fw8OP��@ig@���e�1L'�����sAm�
��@���Q�eW�t�Ruk�@��AAB.2P�V�� \L}��+����.֏9U]N �)(���d��i(��%F�S<�ҫ ���EN��v�7�Y�%U�>��<�p���`]ݹ�@�#����9Dˬ��M�X2�'��\R��\1- ���V\K1�c_P▒W¨P[ÖÍãÏ2¨▒;O
下面是自定义 InputFormat 和 RecordReader 代码:
输入格式
public class SZ_inptfrmtr extends FileInputFormat<Text, Text>
{
@Override
public RecordReader<Text, Text> getRecordReader(InputSplit split,
JobConf job_run, Reporter reporter) throws IOException {
// TODO Auto-generated method stub
return new SZ_recordreader(job_run, (FileSplit)split);
}
}
记录阅读器:
public class SZ_recordreader implements RecordReader<Text, Text>
{
FileSplit split;
JobConf job_run;
boolean processed=false;
CompressionCodecFactory compressioncodec=null; // A factory that will find the correct codec(.file) for a given filename.
public SZ_recordreader(JobConf job_run, FileSplit split)
{
this.split=split;
this.job_run=job_run;
}
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
}
@Override
public Text createKey() {
// TODO Auto-generated method stub
return new Text();
}
@Override
public Text createValue() {
// TODO Auto-generated method stub
return new Text();
}
@Override
public long getPos() throws IOException {
// TODO Auto-generated method stub
return processed ? split.getLength() : 0;
}
@Override
public float getProgress() throws IOException {
// TODO Auto-generated method stub
return processed ? 1.0f : 0.0f;
}
@Override
public boolean next(Text key, Text value) throws IOException {
// TODO Auto-generated method stub
FSDataInputStream in=null;
if (!processed)
{
byte [] bytestream= new byte [(int) split.getLength()];
Path path=split.getPath();
compressioncodec=new CompressionCodecFactory(job_run);
CompressionCodec code = compressioncodec.getCodec(path);
// compressioncodec will find the correct codec by visiting the path of the file and store the result in code
System.out.println(code);
FileSystem fs= path.getFileSystem(job_run);
try
{
in =fs.open(path);
IOUtils.readFully(in, bytestream, 0, bytestream.length);
System.out.println("the input is " +in+ in.toString());
key.set(path.getName());
value.set(bytestream, 0, bytestream.length);
}
finally
{
IOUtils.closeStream(in);
}
processed=true;
return true;
}
return false;
}
}
谁能指出漏洞。。
【问题讨论】:
-
System.out.println(new CompressionCodecFactory(job_run).toString());的输出是什么?可能是您没有zg.rat.的编解码器。